You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

222 lines
5.4 KiB

  1. package znet
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "zinx/utils"
  8. "zinx/ziface"
  9. )
  10. type Connection struct {
  11. //当前Conn属于哪个Server
  12. TcpServer ziface.IServer
  13. //当前连接的socket TCP套接字
  14. Conn *net.TCPConn
  15. //当前连接的ID 也可以称作为SessionID,ID全局唯一
  16. ConnID uint32
  17. //当前连接的关闭状态
  18. isClosed bool
  19. //消息管理MsgId和对应处理方法的消息管理模块
  20. MsgHandler ziface.IMsgHandle
  21. //告知该链接已经退出/停止的channel
  22. ExitBuffChan chan bool
  23. //无缓冲管道,用于读、写两个goroutine之间的消息通信
  24. msgChan chan []byte
  25. //有关冲管道,用于读、写两个goroutine之间的消息通信
  26. msgBuffChan chan []byte
  27. }
  28. //创建连接的方法
  29. func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{
  30. //初始化Conn属性
  31. c := &Connection{
  32. TcpServer:server,
  33. Conn: conn,
  34. ConnID: connID,
  35. isClosed: false,
  36. MsgHandler: msgHandler,
  37. ExitBuffChan: make(chan bool, 1),
  38. msgChan:make(chan []byte),
  39. msgBuffChan:make(chan []byte, utils.GlobalObject.MaxMsgChanLen),
  40. }
  41. //将新创建的Conn添加到链接管理中
  42. c.TcpServer.GetConnMgr().Add(c)
  43. return c
  44. }
  45. /*
  46. 写消息Goroutine 用户将数据发送给客户端
  47. */
  48. func (c *Connection) StartWriter() {
  49. fmt.Println("[Writer Goroutine is running]")
  50. defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
  51. for {
  52. select {
  53. case data := <-c.msgChan:
  54. //有数据要写给客户端
  55. if _, err := c.Conn.Write(data); err != nil {
  56. fmt.Println("Send Data error:, ", err, " Conn Writer exit")
  57. return
  58. }
  59. case data, ok:= <-c.msgBuffChan:
  60. if ok {
  61. //有数据要写给客户端
  62. if _, err := c.Conn.Write(data); err != nil {
  63. fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
  64. return
  65. }
  66. } else {
  67. break
  68. fmt.Println("msgBuffChan is Closed")
  69. }
  70. case <-c.ExitBuffChan:
  71. return
  72. }
  73. }
  74. }
  75. /*
  76. 读消息Goroutine用于从客户端中读取数据
  77. */
  78. func (c *Connection) StartReader() {
  79. fmt.Println("[Reader Goroutine is running]")
  80. defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
  81. defer c.Stop()
  82. for {
  83. // 创建拆包解包的对象
  84. dp := NewDataPack()
  85. //读取客户端的Msg head
  86. headData := make([]byte, dp.GetHeadLen())
  87. if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
  88. fmt.Println("read msg head error ", err)
  89. break
  90. }
  91. //拆包,得到msgid 和 datalen 放在msg中
  92. msg , err := dp.Unpack(headData)
  93. if err != nil {
  94. fmt.Println("unpack error ", err)
  95. break
  96. }
  97. //根据 dataLen 读取 data,放在msg.Data中
  98. var data []byte
  99. if msg.GetDataLen() > 0 {
  100. data = make([]byte, msg.GetDataLen())
  101. if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
  102. fmt.Println("read msg data error ", err)
  103. break
  104. }
  105. }
  106. msg.SetData(data)
  107. //得到当前客户端请求的Request数据
  108. req := Request{
  109. conn:c,
  110. msg:msg,
  111. }
  112. if utils.GlobalObject.WorkerPoolSize > 0 {
  113. //已经启动工作池机制,将消息交给Worker处理
  114. c.MsgHandler.SendMsgToTaskQueue(&req)
  115. } else {
  116. //从绑定好的消息和对应的处理方法中执行对应的Handle方法
  117. go c.MsgHandler.DoMsgHandler(&req)
  118. }
  119. }
  120. }
  121. //启动连接,让当前连接开始工作
  122. func (c *Connection) Start() {
  123. //1 开启用户从客户端读取数据流程的Goroutine
  124. go c.StartReader()
  125. //2 开启用于写回客户端数据流程的Goroutine
  126. go c.StartWriter()
  127. //按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
  128. c.TcpServer.CallOnConnStart(c)
  129. }
  130. //停止连接,结束当前连接状态M
  131. func (c *Connection) Stop() {
  132. fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
  133. //如果当前链接已经关闭
  134. if c.isClosed == true {
  135. return
  136. }
  137. c.isClosed = true
  138. //如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
  139. c.TcpServer.CallOnConnStop(c)
  140. // 关闭socket链接
  141. c.Conn.Close()
  142. //关闭Writer
  143. c.ExitBuffChan <- true
  144. //将链接从连接管理器中删除
  145. c.TcpServer.GetConnMgr().Remove(c)
  146. //关闭该链接全部管道
  147. close(c.ExitBuffChan)
  148. close(c.msgBuffChan)
  149. }
  150. //从当前连接获取原始的socket TCPConn
  151. func (c *Connection) GetTCPConnection() *net.TCPConn {
  152. return c.Conn
  153. }
  154. //获取当前连接ID
  155. func (c *Connection) GetConnID() uint32{
  156. return c.ConnID
  157. }
  158. //获取远程客户端地址信息
  159. func (c *Connection) RemoteAddr() net.Addr {
  160. return c.Conn.RemoteAddr()
  161. }
  162. //直接将Message数据发送数据给远程的TCP客户端
  163. func (c *Connection) SendMsg(msgId uint32, data []byte) error {
  164. if c.isClosed == true {
  165. return errors.New("Connection closed when send msg")
  166. }
  167. //将data封包,并且发送
  168. dp := NewDataPack()
  169. msg, err := dp.Pack(NewMsgPackage(msgId, data))
  170. if err != nil {
  171. fmt.Println("Pack error msg id = ", msgId)
  172. return errors.New("Pack error msg ")
  173. }
  174. //写回客户端
  175. c.msgChan <- msg
  176. return nil
  177. }
  178. func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
  179. if c.isClosed == true {
  180. return errors.New("Connection closed when send buff msg")
  181. }
  182. //将data封包,并且发送
  183. dp := NewDataPack()
  184. msg, err := dp.Pack(NewMsgPackage(msgId, data))
  185. if err != nil {
  186. fmt.Println("Pack error msg id = ", msgId)
  187. return errors.New("Pack error msg ")
  188. }
  189. //写回客户端
  190. c.msgBuffChan <- msg
  191. return nil
  192. }