From 5836c612d0ddb0529ab7cb0fbd96b81bad132413 Mon Sep 17 00:00:00 2001 From: aceld Date: Wed, 13 Feb 2019 18:07:52 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=93=BE=E6=8E=A5=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- utils/globalobj.go | 2 ++ ziface/iconnection.go | 4 ++- ziface/iserver.go | 10 ++++++ znet/connection.go | 79 ++++++++++++++++++++++++++++++++----------- znet/server.go | 49 +++++++++++++++++++++++++-- 6 files changed, 123 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index fe89ecc..49536b3 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ Zinx框架的项目制作采用编码和学习教程同步进行,将开发的 `mail`: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) `github`: -[https://github.com/aceld/zinx](https://github.com/aceld/zinx) +[https://github.com/aceld](https://github.com/aceld) `原创书籍gitbook`: [http://legacy.gitbook.com/@aceld](http://legacy.gitbook.com/@aceld) diff --git a/utils/globalobj.go b/utils/globalobj.go index c32f82a..4134d44 100644 --- a/utils/globalobj.go +++ b/utils/globalobj.go @@ -28,6 +28,7 @@ type GlobalObj struct { MaxConn int //当前服务器主机允许的最大链接个数 WorkerPoolSize uint32 //业务工作Worker池的数量 MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量 + MaxMsgChanLen uint32 //SendBuffMsg发送消息的缓冲最大长度 /* config file path @@ -87,6 +88,7 @@ func init() { ConfFilePath: "conf/zinx.json", WorkerPoolSize: 10, MaxWorkerTaskLen: 1024, + MaxMsgChanLen:1024, } //从配置文件中加载一些用户配置的参数 diff --git a/ziface/iconnection.go b/ziface/iconnection.go index 030fd68..ae644d4 100644 --- a/ziface/iconnection.go +++ b/ziface/iconnection.go @@ -14,8 +14,10 @@ type IConnection interface { GetConnID() uint32 //获取远程客户端地址信息 RemoteAddr() net.Addr - //直接将Message数据发送数据给远程的TCP客户端 + //直接将Message数据发送数据给远程的TCP客户端(无缓冲) SendMsg(msgId uint32, data []byte) error + //直接将Message数据发送给远程的TCP客户端(有缓冲) + SendBuffMsg(msgId uint32, data []byte) error } diff --git a/ziface/iserver.go b/ziface/iserver.go index 4765140..c162eac 100644 --- a/ziface/iserver.go +++ b/ziface/iserver.go @@ -10,4 +10,14 @@ type IServer interface{ Serve() //路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用 AddRouter(msgId uint32, router IRouter) + //得到链接管理 + GetConnMgr() IConnManager + //设置该Server的连接创建时Hook函数 + SetOnConnStart(func (IConnection)) + //设置该Server的连接断开时的Hook函数 + SetOnConnStop(func (IConnection)) + //调用连接OnConnStart Hook函数 + CallOnConnStart(conn IConnection) + //调用连接OnConnStop Hook函数 + CallOnConnStop(conn IConnection) } \ No newline at end of file diff --git a/znet/connection.go b/znet/connection.go index b61faba..db3aa74 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -10,6 +10,8 @@ import ( ) type Connection struct { + //当前Conn属于哪个Server + TcpServer ziface.IServer //当前连接的socket TCP套接字 Conn *net.TCPConn //当前连接的ID 也可以称作为SessionID,ID全局唯一 @@ -22,21 +24,27 @@ type Connection struct { ExitBuffChan chan bool //无缓冲管道,用于读、写两个goroutine之间的消息通信 msgChan chan []byte + //有关冲管道,用于读、写两个goroutine之间的消息通信 + msgBuffChan chan []byte } //创建连接的方法 -func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{ +func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{ + //初始化Conn属性 c := &Connection{ + TcpServer:server, Conn: conn, ConnID: connID, isClosed: false, MsgHandler: msgHandler, ExitBuffChan: make(chan bool, 1), msgChan:make(chan []byte), - // SendBuffChan: make(chan []byte, 512), + msgBuffChan:make(chan []byte, utils.GlobalObject.MaxMsgChanLen), } + //将新创建的Conn添加到链接管理中 + c.TcpServer.GetConnMgr().Add(c) return c } @@ -44,9 +52,8 @@ func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) 写消息Goroutine, 用户将数据发送给客户端 */ func (c *Connection) StartWriter() { - - defer fmt.Println(c.RemoteAddr().String(), " conn Writer exit!") - defer c.Stop() + fmt.Println("[Writer Goroutine is running]") + defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]") for { select { @@ -56,6 +63,16 @@ func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) fmt.Println("Send Data error:, ", err, " Conn Writer exit") return } + case data, ok:= <-c.msgBuffChan: + if ok { + //有数据要写给客户端 + if _, err := c.Conn.Write(data); err != nil { + fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit") + return + } + } else { + fmt.Println("msgBuffChan is Closed") + } case <- c.ExitBuffChan: //conn已经关闭 return @@ -68,9 +85,8 @@ func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) 读消息Goroutine,用于从客户端中读取数据 */ func (c *Connection) StartReader() { - fmt.Println("Reader Goroutine is running") - defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!") - defer c.Stop() + fmt.Println("[Reader Goroutine is running]") + defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]") for { // 创建拆包解包的对象 @@ -80,16 +96,14 @@ func (c *Connection) StartReader() { headData := make([]byte, dp.GetHeadLen()) if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil { fmt.Println("read msg head error ", err) - c.ExitBuffChan <- true - continue + break } //拆包,得到msgid 和 datalen 放在msg中 msg , err := dp.Unpack(headData) if err != nil { fmt.Println("unpack error ", err) - c.ExitBuffChan <- true - continue + break } //根据 dataLen 读取 data,放在msg.Data中 @@ -98,8 +112,7 @@ func (c *Connection) StartReader() { data = make([]byte, msg.GetDataLen()) if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil { fmt.Println("read msg data error ", err) - c.ExitBuffChan <- true - continue + break } } msg.SetData(data) @@ -118,20 +131,28 @@ func (c *Connection) StartReader() { go c.MsgHandler.DoMsgHandler(&req) } } + + c.ExitBuffChan <- true } //启动连接,让当前连接开始工作 func (c *Connection) Start() { + //Start()函数结束的时候就应该调用Stop处理善后业务 + defer c.Stop() //1 开启用户从客户端读取数据流程的Goroutine go c.StartReader() //2 开启用于写回客户端数据流程的Goroutine go c.StartWriter() + //按照用户传递进来的创建连接时需要处理的业务,执行钩子方法 + c.TcpServer.CallOnConnStart(c) + for { select { case <- c.ExitBuffChan: //得到退出消息,不再阻塞 + fmt.Println("Start recv ExitBuffChan...") return } } @@ -139,23 +160,25 @@ func (c *Connection) Start() { //停止连接,结束当前连接状态M func (c *Connection) Stop() { + fmt.Println("Conn Stop()...ConnID = ", c.ConnID) //1. 如果当前链接已经关闭 if c.isClosed == true { return } c.isClosed = true - //TODO Connection Stop() 如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用 + //如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用 + c.TcpServer.CallOnConnStop(c) // 关闭socket链接 c.Conn.Close() - //通知从缓冲队列读数据的业务,该链接已经关闭 - c.ExitBuffChan <- true + //将链接从连接管理器中删除 + c.TcpServer.GetConnMgr().Remove(c) //关闭该链接全部管道 close(c.ExitBuffChan) - //close(c.SendBuffChan) + close(c.msgBuffChan) } //从当前连接获取原始的socket TCPConn @@ -190,4 +213,22 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error { c.msgChan <- msg return nil -} \ No newline at end of file +} + +func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error { + if c.isClosed == true { + return errors.New("Connection closed when send buff msg") + } + //将data封包,并且发送 + dp := NewDataPack() + msg, err := dp.Pack(NewMsgPackage(msgId, data)) + if err != nil { + fmt.Println("Pack error msg id = ", msgId) + return errors.New("Pack error msg ") + } + + //写回客户端 + c.msgBuffChan <- msg + + return nil +} diff --git a/znet/server.go b/znet/server.go index 34dfcec..7f12156 100644 --- a/znet/server.go +++ b/znet/server.go @@ -19,6 +19,12 @@ type Server struct { Port int //当前Server的消息管理模块,用来绑定MsgId和对应的处理方法 msgHandler ziface.IMsgHandle + //当前Server的链接管理器 + ConnMgr ziface.IConnManager + //该Server的连接创建时Hook函数 + OnConnStart func(conn ziface.IConnection) + //该Server的连接断开时的Hook函数 + OnConnStop func(conn ziface.IConnection) } /* @@ -33,6 +39,7 @@ func NewServer () ziface.IServer { IP:utils.GlobalObject.Host, Port:utils.GlobalObject.TcpPort, msgHandler: NewMsgHandle(), + ConnMgr:NewConnManager(), } return s } @@ -81,10 +88,14 @@ func (s *Server) Start() { continue } - //3.2 TODO Server.Start() 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接 + //3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接 + if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn { + conn.Close() + continue + } //3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的 - dealConn := NewConntion(conn, cid, s.msgHandler) + dealConn := NewConntion(s, conn, cid, s.msgHandler) cid ++ //3.4 启动当前链接的处理业务 @@ -113,4 +124,38 @@ func (s *Server)AddRouter(msgId uint32, router ziface.IRouter) { s.msgHandler.AddRouter(msgId, router) } +//得到链接管理 +func (s *Server) GetConnMgr() ziface.IConnManager { + return s.ConnMgr +} + +//设置该Server的连接创建时Hook函数 +func (s *Server) SetOnConnStart(hookFunc func (ziface.IConnection)) { + s.OnConnStart = hookFunc +} + +//设置该Server的连接断开时的Hook函数 +func (s *Server) SetOnConnStop(hookFunc func (ziface.IConnection)) { + s.OnConnStop = hookFunc +} + +//调用连接OnConnStart Hook函数 +func (s *Server) CallOnConnStart(conn ziface.IConnection) { + if s.OnConnStart != nil { + s.OnConnStart(conn) + } +} + +//调用连接OnConnStop Hook函数 +func (s *Server) CallOnConnStop(conn ziface.IConnection) { + if s.OnConnStop != nil { + s.OnConnStop(conn) + } +} + +//捕捉信号处理函数,对服务器退出平滑处理 +func (s *Server) WaitSignal() { + +} +