You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

195 lines
4.5 KiB

  1. package znet
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "zinx/ziface"
  8. )
  9. type Connection struct {
  10. //当前连接的socket TCP套接字
  11. Conn *net.TCPConn
  12. //当前连接的ID 也可以称作为SessionID,ID全局唯一
  13. ConnID uint32
  14. //当前连接的关闭状态
  15. isClosed bool
  16. //消息管理MsgId和对应处理方法的消息管理模块
  17. MsgHandler ziface.IMsgHandle
  18. //告知该链接已经退出/停止的channel
  19. ExitBuffChan chan bool
  20. //无缓冲管道,用于读、写两个goroutine之间的消息通信
  21. msgChan chan []byte
  22. //给缓冲队列发送数据的channel,
  23. // 如果向缓冲队列发送数据,那么把数据发送到这个channel下
  24. // SendBuffChan chan []byte
  25. }
  26. //创建连接的方法
  27. func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{
  28. c := &Connection{
  29. Conn: conn,
  30. ConnID: connID,
  31. isClosed: false,
  32. MsgHandler: msgHandler,
  33. ExitBuffChan: make(chan bool, 1),
  34. msgChan:make(chan []byte),
  35. // SendBuffChan: make(chan []byte, 512),
  36. }
  37. return c
  38. }
  39. /*
  40. 写消息Goroutine 用户将数据发送给客户端
  41. */
  42. func (c *Connection) StartWriter() {
  43. defer fmt.Println(c.RemoteAddr().String(), " conn Writer exit!")
  44. defer c.Stop()
  45. for {
  46. select {
  47. case data := <-c.msgChan:
  48. //有数据要写给客户端
  49. if _, err := c.Conn.Write(data); err != nil {
  50. fmt.Println("Send Data error:, ", err, " Conn Writer exit")
  51. return
  52. }
  53. case <- c.ExitBuffChan:
  54. //conn已经关闭
  55. return
  56. }
  57. }
  58. }
  59. /*
  60. 读消息Goroutine用于从客户端中读取数据
  61. */
  62. func (c *Connection) StartReader() {
  63. fmt.Println("Reader Goroutine is running")
  64. defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
  65. defer c.Stop()
  66. for {
  67. // 创建拆包解包的对象
  68. dp := NewDataPack()
  69. //读取客户端的Msg head
  70. headData := make([]byte, dp.GetHeadLen())
  71. if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
  72. fmt.Println("read msg head error ", err)
  73. c.ExitBuffChan <- true
  74. continue
  75. }
  76. //拆包,得到msgid 和 datalen 放在msg中
  77. msg , err := dp.Unpack(headData)
  78. if err != nil {
  79. fmt.Println("unpack error ", err)
  80. c.ExitBuffChan <- true
  81. continue
  82. }
  83. //根据 dataLen 读取 data,放在msg.Data中
  84. var data []byte
  85. if msg.GetDataLen() > 0 {
  86. data = make([]byte, msg.GetDataLen())
  87. if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
  88. fmt.Println("read msg data error ", err)
  89. c.ExitBuffChan <- true
  90. continue
  91. }
  92. }
  93. msg.SetData(data)
  94. //得到当前客户端请求的Request数据
  95. req := Request{
  96. conn:c,
  97. msg:msg,
  98. }
  99. //从绑定好的消息和对应的处理方法中执行对应的Handle方法
  100. go c.MsgHandler.DoMsgHandler(&req)
  101. }
  102. }
  103. //启动连接,让当前连接开始工作
  104. func (c *Connection) Start() {
  105. //1 开启用户从客户端读取数据流程的Goroutine
  106. go c.StartReader()
  107. //2 开启用于写回客户端数据流程的Goroutine
  108. go c.StartWriter()
  109. for {
  110. select {
  111. case <- c.ExitBuffChan:
  112. //得到退出消息,不再阻塞
  113. return
  114. }
  115. }
  116. }
  117. //停止连接,结束当前连接状态M
  118. func (c *Connection) Stop() {
  119. //1. 如果当前链接已经关闭
  120. if c.isClosed == true {
  121. return
  122. }
  123. c.isClosed = true
  124. //TODO Connection Stop() 如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
  125. // 关闭socket链接
  126. c.Conn.Close()
  127. //通知从缓冲队列读数据的业务,该链接已经关闭
  128. c.ExitBuffChan <- true
  129. //关闭该链接全部管道
  130. close(c.ExitBuffChan)
  131. //close(c.SendBuffChan)
  132. }
  133. //从当前连接获取原始的socket TCPConn
  134. func (c *Connection) GetTCPConnection() *net.TCPConn {
  135. return c.Conn
  136. }
  137. //获取当前连接ID
  138. func (c *Connection) GetConnID() uint32{
  139. return c.ConnID
  140. }
  141. //获取远程客户端地址信息
  142. func (c *Connection) RemoteAddr() net.Addr {
  143. return c.Conn.RemoteAddr()
  144. }
  145. //直接将Message数据发送数据给远程的TCP客户端
  146. func (c *Connection) SendMsg(msgId uint32, data []byte) error {
  147. if c.isClosed == true {
  148. return errors.New("Connection closed when send msg")
  149. }
  150. //将data封包,并且发送
  151. dp := NewDataPack()
  152. msg, err := dp.Pack(NewMsgPackage(msgId, data))
  153. if err != nil {
  154. fmt.Println("Pack error msg id = ", msgId)
  155. return errors.New("Pack error msg ")
  156. }
  157. //写回客户端
  158. c.msgChan <- msg
  159. return nil
  160. }
  161. //将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端
  162. //func (c *Connection) SendBuff(data []byte) error {
  163. // return nil
  164. //}