From bccf087bc7fcf72830e6ea31a68eee76b389d160 Mon Sep 17 00:00:00 2001 From: aceld Date: Sat, 2 Feb 2019 17:36:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E6=B6=88=E6=81=AF=E5=B0=81?= =?UTF-8?q?=E8=A3=85=E6=9C=BA=E5=88=B6=20Zinx-V0.5=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils/globalobj.go | 47 ++++++--- ziface/{iconnnection.go => iconnection.go} | 6 +- ziface/idatapack.go | 11 +++ ziface/imessage.go | 14 +++ ziface/irequest.go | 1 + znet/connection.go | 70 ++++++++++++-- znet/datapack.go | 71 ++++++++++++++ znet/datepack_test.go | 107 +++++++++++++++++++++ znet/message.go | 46 +++++++++ znet/request.go | 9 +- znet/server_test.go | 58 +++++------ 11 files changed, 386 insertions(+), 54 deletions(-) rename ziface/{iconnnection.go => iconnection.go} (77%) create mode 100644 ziface/idatapack.go create mode 100644 ziface/imessage.go create mode 100644 znet/datapack.go create mode 100644 znet/datepack_test.go create mode 100644 znet/message.go diff --git a/utils/globalobj.go b/utils/globalobj.go index 45ad4c3..8c358aa 100644 --- a/utils/globalobj.go +++ b/utils/globalobj.go @@ -3,6 +3,7 @@ package utils import ( "encoding/json" "io/ioutil" + "os" "zinx/ziface" ) @@ -13,7 +14,7 @@ import ( type GlobalObj struct { /* Server - */ + */ TcpServer ziface.IServer //当前Zinx的全局Server对象 Host string //当前服务器主机IP TcpPort int //当前服务器主机监听端口号 @@ -21,10 +22,15 @@ type GlobalObj struct { /* Zinx + */ + Version string //当前Zinx版本号 + MaxPacketSize uint32 //都需数据包的最大值 + MaxConn int //当前服务器主机允许的最大链接个数 + + /* + config file path */ - Version string //当前Zinx版本号 - MaxPacketSize uint32 //都需数据包的最大值 - MaxConn int //当前服务器主机允许的最大链接个数 + ConfFilePath string } /* @@ -32,9 +38,27 @@ type GlobalObj struct { */ var GlobalObject *GlobalObj +//判断一个文件是否存在 +func PathExists(path string) (bool, error) { + _, err := os.Stat(path) + if err == nil { + return true, nil + } + if os.IsNotExist(err) { + return false, nil + } + return false, err +} + //读取用户的配置文件 func (g *GlobalObj) Reload() { - data, err := ioutil.ReadFile("conf/zinx.json") + + if confFileExists, _ := PathExists(g.ConfFilePath) ; confFileExists != true { + //fmt.Println("Config File ", g.ConfFilePath , " is not exist!!") + return + } + + data, err := ioutil.ReadFile(g.ConfFilePath) if err != nil { panic(err) } @@ -52,12 +76,13 @@ func (g *GlobalObj) Reload() { func init() { //初始化GlobalObject变量,设置一些默认值 GlobalObject = &GlobalObj{ - Name: "ZinxServerApp", - Version: "V0.4", - TcpPort: 7777, - Host: "0.0.0.0", - MaxConn: 12000, - MaxPacketSize:4096, + Name: "ZinxServerApp", + Version: "V0.4", + TcpPort: 7777, + Host: "0.0.0.0", + MaxConn: 12000, + MaxPacketSize: 4096, + ConfFilePath: "conf/zinx.json", } //从配置文件中加载一些用户配置的参数 diff --git a/ziface/iconnnection.go b/ziface/iconnection.go similarity index 77% rename from ziface/iconnnection.go rename to ziface/iconnection.go index f751ca0..83e4ba5 100644 --- a/ziface/iconnnection.go +++ b/ziface/iconnection.go @@ -14,10 +14,10 @@ type IConnection interface { GetConnID() uint32 //获取远程客户端地址信息 RemoteAddr() net.Addr - //直接将数据发送数据给远程的TCP客户端 - Send(data []byte) error + //直接将Message数据发送数据给远程的TCP客户端 + SendMsg(msgId uint32, data []byte) error //将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端 - SendBuff(data []byte) error + //SendBuff(data []byte) error } diff --git a/ziface/idatapack.go b/ziface/idatapack.go new file mode 100644 index 0000000..8c0254c --- /dev/null +++ b/ziface/idatapack.go @@ -0,0 +1,11 @@ +package ziface + +/* + 封包数据和拆包数据 + 直接面向TCP连接中的数据流,为传输数据添加头部信息,用于处理TCP粘包问题。 + */ +type IDataPack interface{ + GetHeadLen() uint32 //获取包头长度方法 + Pack(msg IMessage)([]byte, error) //封包方法 + Unpack([]byte)(IMessage, error) //拆包方法 +} diff --git a/ziface/imessage.go b/ziface/imessage.go new file mode 100644 index 0000000..9071696 --- /dev/null +++ b/ziface/imessage.go @@ -0,0 +1,14 @@ +package ziface + +/* + 将请求的一个消息封装到message中,定义抽象层接口 + */ +type IMessage interface { + GetDataLen() uint32 //获取消息数据段长度 + GetMsgId() uint32 //获取消息ID + GetData() []byte //获取消息内容 + + SetMsgId(uint32) //设计消息ID + SetData([]byte) //设计消息内容 + SetDataLen(uint32) //设置消息数据段长度 +} diff --git a/ziface/irequest.go b/ziface/irequest.go index a87d505..842433f 100644 --- a/ziface/irequest.go +++ b/ziface/irequest.go @@ -7,5 +7,6 @@ package ziface type IRequest interface{ GetConnection() IConnection //获取请求连接信息 GetData() []byte //获取请求消息的数据 + GetMsgID() uint32 //获取请求的消息ID } diff --git a/znet/connection.go b/znet/connection.go index c40cf31..5292581 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -1,9 +1,10 @@ package znet import ( + "errors" "fmt" + "io" "net" - "zinx/utils" "zinx/ziface" ) @@ -49,17 +50,48 @@ func (c *Connection) StartReader() { for { //读取我们最大的数据到buf中 - buf := make([]byte, utils.GlobalObject.MaxPacketSize) - _, err := c.Conn.Read(buf) + //buf := make([]byte, utils.GlobalObject.MaxPacketSize) + //_, err := c.Conn.Read(buf) + //if err != nil { + // fmt.Println("recv buf err ", err) + // c.ExitBuffChan <- true + // continue + //} + // 创建拆包解包的对象 + dp := NewDataPack() + + //读取客户端的Msg head + headData := make([]byte, dp.GetHeadLen()) + if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil { + fmt.Println("read msg head error ", err) + c.ExitBuffChan <- true + continue + } + + //拆包,得到msgid 和 datalen 放在msg中 + msg , err := dp.Unpack(headData) if err != nil { - fmt.Println("recv buf err ", err) + fmt.Println("unpack error ", err) c.ExitBuffChan <- true continue } + + //根据 dataLen 读取 data,放在msg.Data中 + var data []byte + if msg.GetDataLen() > 0 { + data = make([]byte, msg.GetDataLen()) + if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil { + fmt.Println("read msg data error ", err) + c.ExitBuffChan <- true + continue + } + } + msg.SetData(data) + //得到当前客户端请求的Request数据 req := Request{ conn:c, - data:buf, + msg:msg, } //从路由Routers 中找到注册绑定Conn的对应Handle go func (request ziface.IRequest) { @@ -131,12 +163,30 @@ func (c *Connection) RemoteAddr() net.Addr { return c.Conn.RemoteAddr() } -//直接将数据发送数据给远程的TCP客户端 -func (c *Connection) Send(data []byte) error { +//直接将Message数据发送数据给远程的TCP客户端 +func (c *Connection) SendMsg(msgId uint32, data []byte) error { + if c.isClosed == true { + return errors.New("Connection closed when send msg") + } + //将data封包,并且发送 + dp := NewDataPack() + msg, err := dp.Pack(NewMsgPackage(msgId, data)) + if err != nil { + fmt.Println("Pack error msg id = ", msgId) + return errors.New("Pack error msg ") + } + + //写回客户端 + if _, err := c.Conn.Write(msg); err != nil { + fmt.Println("Write msg id ", msgId, " error ") + c.ExitBuffChan <- true + return errors.New("conn Write error") + } + return nil } //将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端 -func (c *Connection) SendBuff(data []byte) error { - return nil -} +//func (c *Connection) SendBuff(data []byte) error { +// return nil +//} diff --git a/znet/datapack.go b/znet/datapack.go new file mode 100644 index 0000000..fdbbd6d --- /dev/null +++ b/znet/datapack.go @@ -0,0 +1,71 @@ +package znet + +import ( + "bytes" + "encoding/binary" + "errors" + "zinx/utils" + "zinx/ziface" +) + +//封包拆包类实例,暂时不需要成员 +type DataPack struct {} + +//封包拆包实例初始化方法 +func NewDataPack() *DataPack { + return &DataPack{} +} + +//获取包头长度方法 +func(dp *DataPack) GetHeadLen() uint32 { + //Id uint32(4字节) + DataLen uint32(4字节) + return 8 +} +//封包方法(压缩数据) +func(dp *DataPack) Pack(msg ziface.IMessage)([]byte, error) { + //创建一个存放bytes字节的缓冲 + dataBuff := bytes.NewBuffer([]byte{}) + + //写msgID + if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil { + return nil, err + } + + //写dataLen + if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil { + return nil, err + } + + //写data数据 + if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil { + return nil ,err + } + + return dataBuff.Bytes(), nil +} +//拆包方法(解压数据) +func(dp *DataPack) Unpack(binaryData []byte)(ziface.IMessage, error) { + //创建一个从输入二进制数据的ioReader + dataBuff := bytes.NewReader(binaryData) + + //只解压head的信息,得到dataLen和msgID + msg := &Message{} + + //读msgID + if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil { + return nil, err + } + + //读dataLen + if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil { + return nil, err + } + + //判断dataLen的长度是否超出我们允许的最大包长度 + if (utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize) { + return nil, errors.New("Too large msg data recieved") + } + + //这里只需要把head的数据拆包出来就可以了,然后再通过head的长度,再从conn读取一次数据 + return msg, nil +} diff --git a/znet/datepack_test.go b/znet/datepack_test.go new file mode 100644 index 0000000..3e2436e --- /dev/null +++ b/znet/datepack_test.go @@ -0,0 +1,107 @@ +package znet + +import ( + "fmt" + "io" + "net" + "testing" +) + +//只是负责测试datapack拆包,封包功能 +func TestDataPack(t *testing.T) { + //创建socket TCP Server + listener, err := net.Listen("tcp", "127.0.0.1:7777") + if err != nil{ + fmt.Println("server listen err:", err) + return + } + + //创建服务器gotoutine,负责从客户端goroutine读取粘包的数据,然后进行解析 + go func (){ + for{ + conn, err := listener.Accept() + if err != nil{ + fmt.Println("server accept err:", err) + } + + //处理客户端请求 + go func(conn net.Conn){ + //创建封包拆包对象dp + dp := NewDataPack() + for{ + //1 先读出流中的head部分 + headData := make([]byte, dp.GetHeadLen()) + _, err := io.ReadFull(conn, headData) //ReadFull 会把msg填充满为止 + if err != nil { + fmt.Println("read head error") + } + //将headData字节流 拆包到msg中 + msgHead,err := dp.Unpack(headData) + if err != nil{ + fmt.Println("server unpack err:", err) + return + } + + if msgHead.GetDataLen() > 0 { + //msg 是有data数据的,需要再次读取data数据 + msg := msgHead.(*Message) + msg.Data = make([]byte, msg.GetDataLen()) + + //根据dataLen从io中读取字节流 + _, err := io.ReadFull(conn, msg.Data) + if err != nil { + fmt.Println("server unpack data err:", err) + return + } + + fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data)) + } + } + }(conn) + + } + }() + + //客户端goroutine,负责模拟粘包的数据,然后进行发送 + conn, err := net.Dial("tcp", "127.0.0.1:7777") + if err != nil{ + fmt.Println("client dial err:", err) + return + } + + //创建一个封包对象 dp + dp := NewDataPack() + + //封装一个msg1包 + msg1 := &Message{ + Id:0, + DataLen:5, + Data:[]byte{'h', 'e', 'l', 'l', 'o'}, + } + + sendData1, err := dp.Pack(msg1) + if err!= nil{ + fmt.Println("client pack msg1 err:", err) + return + } + + msg2 := &Message{ + Id:1, + DataLen:7, + Data:[]byte{'w', 'o', 'r', 'l', 'd', '!', '!'}, + } + sendData2, err := dp.Pack(msg2) + if err!= nil{ + fmt.Println("client temp msg2 err:", err) + return + } + + //将sendData1,和 sendData2 拼接一起,组成粘包 + sendData1 = append(sendData1, sendData2...) + + //向服务器端写数据 + conn.Write(sendData1) + + //客户端阻塞 + select{} +} diff --git a/znet/message.go b/znet/message.go new file mode 100644 index 0000000..2f7430e --- /dev/null +++ b/znet/message.go @@ -0,0 +1,46 @@ +package znet + +type Message struct { + Id uint32 //消息的ID + DataLen uint32 //消息的长度 + Data []byte //消息的内容 +} + +//创建一个Message消息包 +func NewMsgPackage(id uint32, data []byte) *Message { + return &Message{ + Id: id, + DataLen: uint32(len(data)), + Data: data, + } +} + +//获取消息数据段长度 +func (msg *Message) GetDataLen() uint32 { + return msg.DataLen +} + +//获取消息ID +func (msg *Message) GetMsgId() uint32 { + return msg.Id +} + +//获取消息内容 +func (msg *Message) GetData() []byte { + return msg.Data +} + +//设置消息数据段长度 +func (msg *Message) SetDataLen(len uint32) { + msg.DataLen = len +} + +//设计消息ID +func (msg *Message) SetMsgId(msgId uint32) { + msg.Id = msgId +} + +//设计消息内容 +func (msg *Message) SetData(data []byte) { + msg.Data = data +} diff --git a/znet/request.go b/znet/request.go index 4d46cbf..278d34f 100644 --- a/znet/request.go +++ b/znet/request.go @@ -4,7 +4,7 @@ import "zinx/ziface" type Request struct { conn ziface.IConnection //已经和客户端建立好的 链接 - data []byte //客户端请求的数据 + msg ziface.IMessage //客户端请求的数据 } //获取请求连接信息 func(r *Request) GetConnection() ziface.IConnection { @@ -12,5 +12,10 @@ func(r *Request) GetConnection() ziface.IConnection { } //获取请求消息的数据 func(r *Request) GetData() []byte { - return r.data + return r.msg.GetData() +} + +//获取请求的消息的ID +func (r *Request) GetMsgID() uint32 { + return r.msg.GetMsgId() } \ No newline at end of file diff --git a/znet/server_test.go b/znet/server_test.go index 11f53d6..28e8e3f 100644 --- a/znet/server_test.go +++ b/znet/server_test.go @@ -10,40 +10,40 @@ import ( /* 模拟客户端 - */ - func ClientTest() { +*/ +func ClientTest() { - fmt.Println("Client Test ... start") - //3秒之后发起测试请求,给服务端开启服务的机会 - time.Sleep(3 * time.Second) + fmt.Println("Client Test ... start") + //3秒之后发起测试请求,给服务端开启服务的机会 + time.Sleep(3 * time.Second) - conn,err := net.Dial("tcp", "127.0.0.1:7777") - if err != nil { - fmt.Println("client start err, exit!") + conn, err := net.Dial("tcp", "127.0.0.1:7777") + if err != nil { + fmt.Println("client start err, exit!") return } - for { - _, err := conn.Write([]byte("Zinx V0.2 test")) - if err !=nil { - fmt.Println("write error err ", err) - return + for { + _, err := conn.Write([]byte("Zinx V0.2 test")) + if err != nil { + fmt.Println("write error err ", err) + return } - buf :=make([]byte, 512) - cnt, err := conn.Read(buf) - if err != nil { - fmt.Println("read buf error ") - return + buf := make([]byte, 512) + cnt, err := conn.Read(buf) + if err != nil { + fmt.Println("read buf error ") + return } - fmt.Printf(" server call back : %s, cnt = %d\n", buf, cnt) + fmt.Printf(" server call back : %s, cnt = %d\n", buf, cnt) - time.Sleep(1*time.Second) + time.Sleep(1 * time.Second) } - } +} - /* +/* //Server 模块的测试函数 func TestServer(t *testing.T) { @@ -65,19 +65,21 @@ type PingRouter struct { BaseRouter } + //Test PreHandle func (this *PingRouter) PreHandle(request ziface.IRequest) { fmt.Println("Call Router PreHandle") _, err := request.GetConnection().GetTCPConnection().Write([]byte("before ping ....\n")) - if err !=nil { + if err != nil { fmt.Println("call back ping ping ping error") } } + //Test Handle func (this *PingRouter) Handle(request ziface.IRequest) { fmt.Println("Call PingRouter Handle") _, err := request.GetConnection().GetTCPConnection().Write([]byte("ping...ping...ping\n")) - if err !=nil { + if err != nil { fmt.Println("call back ping ping ping error") } } @@ -86,14 +88,14 @@ func (this *PingRouter) Handle(request ziface.IRequest) { func (this *PingRouter) PostHandle(request ziface.IRequest) { fmt.Println("Call Router PostHandle") _, err := request.GetConnection().GetTCPConnection().Write([]byte("After ping .....\n")) - if err !=nil { + if err != nil { fmt.Println("call back ping ping ping error") } } -func TestServerV0_3(t *testing.T){ +func TestServerV0_3(t *testing.T) { //创建一个server句柄 - s := NewServer("[zinx V0.3]") + s := NewServer() s.AddRouter(&PingRouter{}) @@ -102,4 +104,4 @@ func TestServerV0_3(t *testing.T){ //2 开启服务 s.Serve() -} \ No newline at end of file +}