diff --git a/znet/connection.go b/znet/connection.go index 115c665..459081f 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -19,6 +19,8 @@ type Connection struct { MsgHandler ziface.IMsgHandle //告知该链接已经退出/停止的channel ExitBuffChan chan bool + //无缓冲管道,用于读、写两个goroutine之间的消息通信 + msgChan chan []byte //给缓冲队列发送数据的channel, // 如果向缓冲队列发送数据,那么把数据发送到这个channel下 // SendBuffChan chan []byte @@ -34,12 +36,40 @@ func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) isClosed: false, MsgHandler: msgHandler, ExitBuffChan: make(chan bool, 1), + msgChan:make(chan []byte), // SendBuffChan: make(chan []byte, 512), } return c } +/* + 写消息Goroutine, 用户将数据发送给客户端 + */ + func (c *Connection) StartWriter() { + + defer fmt.Println(c.RemoteAddr().String(), " conn Writer exit!") + defer c.Stop() + + for { + select { + case data := <-c.msgChan: + //有数据要写给客户端 + if _, err := c.Conn.Write(data); err != nil { + fmt.Println("Send Data error:, ", err, " Conn Writer exit") + return + } + case <- c.ExitBuffChan: + //conn已经关闭 + return + } + } + } + + +/* + 读消息Goroutine,用于从客户端中读取数据 + */ func (c *Connection) StartReader() { fmt.Println("Reader Goroutine is running") defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!") @@ -90,8 +120,10 @@ func (c *Connection) StartReader() { //启动连接,让当前连接开始工作 func (c *Connection) Start() { - //开启处理该链接读取到客户端数据之后的请求业务 + //1 开启用户从客户端读取数据流程的Goroutine go c.StartReader() + //2 开启用于写回客户端数据流程的Goroutine + go c.StartWriter() for { select { @@ -100,9 +132,6 @@ func (c *Connection) Start() { return } } - - //1 开启用于写回客户端数据流程的Goroutine - //2 开启用户从客户端读取数据流程的Goroutine } //停止连接,结束当前连接状态M @@ -155,11 +184,7 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error { } //写回客户端 - if _, err := c.Conn.Write(msg); err != nil { - fmt.Println("Write msg id ", msgId, " error ") - c.ExitBuffChan <- true - return errors.New("conn Write error") - } + c.msgChan <- msg return nil } diff --git a/znet/server.go b/znet/server.go index f7f649e..eaf7640 100644 --- a/znet/server.go +++ b/znet/server.go @@ -3,7 +3,6 @@ package znet import ( "fmt" "net" - "time" "zinx/utils" "zinx/ziface" ) @@ -103,9 +102,7 @@ func (s *Server) Serve() { //TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加 //阻塞,否则主Go退出, listenner的go将会退出 - for { - time.Sleep(10*time.Second) - } + select{} } //路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用