From 4d032323a492089f01ac8a0c63fd2f36de939f3e Mon Sep 17 00:00:00 2001 From: aceld Date: Wed, 30 Jan 2019 10:18:31 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0Zinx-v0.2=E7=89=88=E6=9C=AC-?= =?UTF-8?q?=E7=BB=91=E5=AE=9Aconn=E4=B8=8Ehandle?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ziface/iconnnection.go | 25 ++++++++ znet/connection.go | 131 +++++++++++++++++++++++++++++++++++++++++ znet/server.go | 41 ++++++------- 3 files changed, 177 insertions(+), 20 deletions(-) create mode 100644 ziface/iconnnection.go create mode 100644 znet/connection.go diff --git a/ziface/iconnnection.go b/ziface/iconnnection.go new file mode 100644 index 0000000..5985cea --- /dev/null +++ b/ziface/iconnnection.go @@ -0,0 +1,25 @@ +package ziface + +import "net" + +//定义连接接口 +type IConnection interface { + //启动连接,让当前连接开始工作 + Start() + //停止连接,结束当前连接状态M + Stop() + //从当前连接获取原始的socket TCPConn + GetTCPConnection() *net.TCPConn + //获取当前连接ID + GetConnID() uint32 + //获取远程客户端地址信息 + RemoteAddr() net.Addr + //直接将数据发送数据给远程的TCP客户端 + Send(data []byte) error + //将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端 + SendBuff(data []byte) error +} + +//定义一个统一处理链接业务的接口 +type HandFunc func(*net.TCPConn, []byte, int) error + diff --git a/znet/connection.go b/znet/connection.go new file mode 100644 index 0000000..e4e0ebd --- /dev/null +++ b/znet/connection.go @@ -0,0 +1,131 @@ +package znet + +import ( + "fmt" + "net" + "zinx/ziface" +) + +type Connection struct { + //当前连接的socket TCP套接字 + Conn *net.TCPConn + //当前连接的ID 也可以称作为SessionID,ID全局唯一 + ConnID uint32 + //当前连接的关闭状态 + isClosed bool + + //该连接的处理方法api + handleAPI ziface.HandFunc + + //告知该链接已经退出/停止的channel + ExitBuffChan chan bool + + //给缓冲队列发送数据的channel, + // 如果向缓冲队列发送数据,那么把数据发送到这个channel下 +// SendBuffChan chan []byte + +} + + +//创建连接的方法 +func NewConntion(conn *net.TCPConn, connID uint32, callback_api ziface.HandFunc) *Connection{ + c := &Connection{ + Conn: conn, + ConnID: connID, + isClosed: false, + handleAPI: callback_api, + ExitBuffChan: make(chan bool, 1), + // SendBuffChan: make(chan []byte, 512), + } + + return c +} + +func (c *Connection) StartReader() { + fmt.Println("Reader Goroutine is running") + defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!") + defer c.Stop() + + for { + //读取我们最大的数据到buf中 + buf := make([]byte, 512) + cnt, err := c.Conn.Read(buf) + if err != nil { + fmt.Println("recv buf err ", err) + c.ExitBuffChan <- true + continue + } + //调用当前链接业务 + if err := c.handleAPI(c.Conn, buf, cnt); err !=nil { + fmt.Println("connID ", c.ConnID, " handle is error") + c.ExitBuffChan <- true + return + } + } + + +} + +//启动连接,让当前连接开始工作 +func (c *Connection) Start() { + + //开启处理该链接读取到客户端数据之后的请求业务 + go c.StartReader() + + for { + select { + case <- c.ExitBuffChan: + //得到退出消息,不再阻塞 + return + } + } + + //1 开启用于写回客户端数据流程的Goroutine + //2 开启用户从客户端读取数据流程的Goroutine +} + +//停止连接,结束当前连接状态M +func (c *Connection) Stop() { + //1. 如果当前链接已经关闭 + if c.isClosed == true { + return + } + c.isClosed = true + + //TODO Connection Stop() 如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用 + + // 关闭socket链接 + c.Conn.Close() + + //通知从缓冲队列读数据的业务,该链接已经关闭 + c.ExitBuffChan <- true + + //关闭该链接全部管道 + //close(c.ExitBuffChan) + //close(c.SendBuffChan) +} + +//从当前连接获取原始的socket TCPConn +func (c *Connection) GetTCPConnection() *net.TCPConn { + return c.Conn +} + +//获取当前连接ID +func (c *Connection) GetConnID() uint32{ + return c.ConnID +} + +//获取远程客户端地址信息 +func (c *Connection) RemoteAddr() net.Addr { + return c.Conn.RemoteAddr() +} + +//直接将数据发送数据给远程的TCP客户端 +func (c *Connection) Send(data []byte) error { + return nil +} + +//将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端 +func (c *Connection) SendBuff(data []byte) error { + return nil +} diff --git a/znet/server.go b/znet/server.go index bc39c36..90d53fc 100644 --- a/znet/server.go +++ b/znet/server.go @@ -1,6 +1,7 @@ package znet import ( + "errors" "fmt" "net" "time" @@ -19,6 +20,16 @@ type Server struct { Port int } +//============== 定义当前客户端链接的handle api =========== +func CallBackToClient(conn *net.TCPConn, data []byte, cnt int) error { + //回显业务 + fmt.Println("[Conn Handle] CallBackToClient ... ") + if _, err := conn.Write(data[:cnt]); err !=nil { + fmt.Println("write back buf err ", err) + return errors.New("CallBackToClient error") + } + return nil +} //============== 实现 ziface.IServer 里的全部接口方法 ======== @@ -45,6 +56,10 @@ func (s *Server) Start() { //已经监听成功 fmt.Println("start Zinx server ", s.Name, " succ, now listenning...") + //TODO server.go 应该有一个自动生成ID的方法 + var cid uint32 + cid = 0 + //3 启动server网络连接业务 for { //3.1 阻塞等待客户端建立连接请求 @@ -56,25 +71,12 @@ func (s *Server) Start() { //3.2 TODO Server.Start() 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接 - //3.3 TODO Server.Start() 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的 - - //我们这里暂时做一个最大512字节的回显服务 - go func () { - //不断的循环从客户端获取数据 - for { - buf := make([]byte, 512) - cnt, err := conn.Read(buf) - if err != nil { - fmt.Println("recv buf err ", err) - continue - } - //回显 - if _, err := conn.Write(buf[:cnt]); err !=nil { - fmt.Println("write back buf err ", err) - continue - } - } - }() + //3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的 + dealConn := NewConntion(conn, cid, CallBackToClient) + cid ++ + + //3.4 启动当前链接的处理业务 + go dealConn.Start() } }() } @@ -90,7 +92,6 @@ func (s *Server) Serve() { //TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加 - //阻塞,否则主Go退出, listenner的go将会退出 for { time.Sleep(10*time.Second)