You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

255 lines
6.1 KiB

  1. package znet
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. "zinx/utils"
  9. "zinx/ziface"
  10. )
  11. type Connection struct {
  12. //当前Conn属于哪个Server
  13. TcpServer ziface.IServer
  14. //当前连接的socket TCP套接字
  15. Conn *net.TCPConn
  16. //当前连接的ID 也可以称作为SessionID,ID全局唯一
  17. ConnID uint32
  18. //当前连接的关闭状态
  19. isClosed bool
  20. //消息管理MsgId和对应处理方法的消息管理模块
  21. MsgHandler ziface.IMsgHandle
  22. //告知该链接已经退出/停止的channel
  23. ExitBuffChan chan bool
  24. //无缓冲管道,用于读、写两个goroutine之间的消息通信
  25. msgChan chan []byte
  26. //有关冲管道,用于读、写两个goroutine之间的消息通信
  27. msgBuffChan chan []byte
  28. //链接属性
  29. property map[string]interface{}
  30. //保护链接属性修改的锁
  31. propertyLock sync.RWMutex
  32. }
  33. //创建连接的方法
  34. func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
  35. //初始化Conn属性
  36. c := &Connection{
  37. TcpServer: server,
  38. Conn: conn,
  39. ConnID: connID,
  40. isClosed: false,
  41. MsgHandler: msgHandler,
  42. ExitBuffChan: make(chan bool, 1),
  43. msgChan: make(chan []byte),
  44. msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen),
  45. property: make(map[string]interface{}),
  46. }
  47. //将新创建的Conn添加到链接管理中
  48. c.TcpServer.GetConnMgr().Add(c)
  49. return c
  50. }
  51. /*
  52. 写消息Goroutine 用户将数据发送给客户端
  53. */
  54. func (c *Connection) StartWriter() {
  55. fmt.Println("[Writer Goroutine is running]")
  56. defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
  57. for {
  58. select {
  59. case data := <-c.msgChan:
  60. //有数据要写给客户端
  61. if _, err := c.Conn.Write(data); err != nil {
  62. fmt.Println("Send Data error:, ", err, " Conn Writer exit")
  63. return
  64. }
  65. case data, ok := <-c.msgBuffChan:
  66. if ok {
  67. //有数据要写给客户端
  68. if _, err := c.Conn.Write(data); err != nil {
  69. fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
  70. return
  71. }
  72. } else {
  73. break
  74. fmt.Println("msgBuffChan is Closed")
  75. }
  76. case <-c.ExitBuffChan:
  77. return
  78. }
  79. }
  80. }
  81. /*
  82. 读消息Goroutine用于从客户端中读取数据
  83. */
  84. func (c *Connection) StartReader() {
  85. fmt.Println("[Reader Goroutine is running]")
  86. defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
  87. defer c.Stop()
  88. for {
  89. // 创建拆包解包的对象
  90. dp := NewDataPack()
  91. //读取客户端的Msg head
  92. headData := make([]byte, dp.GetHeadLen())
  93. if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
  94. fmt.Println("read msg head error ", err)
  95. break
  96. }
  97. //拆包,得到msgid 和 datalen 放在msg中
  98. msg, err := dp.Unpack(headData)
  99. if err != nil {
  100. fmt.Println("unpack error ", err)
  101. break
  102. }
  103. //根据 dataLen 读取 data,放在msg.Data中
  104. var data []byte
  105. if msg.GetDataLen() > 0 {
  106. data = make([]byte, msg.GetDataLen())
  107. if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
  108. fmt.Println("read msg data error ", err)
  109. break
  110. }
  111. }
  112. msg.SetData(data)
  113. //得到当前客户端请求的Request数据
  114. req := Request{
  115. conn: c,
  116. msg: msg,
  117. }
  118. if utils.GlobalObject.WorkerPoolSize > 0 {
  119. //已经启动工作池机制,将消息交给Worker处理
  120. c.MsgHandler.SendMsgToTaskQueue(&req)
  121. } else {
  122. //从绑定好的消息和对应的处理方法中执行对应的Handle方法
  123. go c.MsgHandler.DoMsgHandler(&req)
  124. }
  125. }
  126. }
  127. //启动连接,让当前连接开始工作
  128. func (c *Connection) Start() {
  129. //1 开启用户从客户端读取数据流程的Goroutine
  130. go c.StartReader()
  131. //2 开启用于写回客户端数据流程的Goroutine
  132. go c.StartWriter()
  133. //按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
  134. c.TcpServer.CallOnConnStart(c)
  135. }
  136. //停止连接,结束当前连接状态M
  137. func (c *Connection) Stop() {
  138. fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
  139. //如果当前链接已经关闭
  140. if c.isClosed == true {
  141. return
  142. }
  143. c.isClosed = true
  144. //如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
  145. c.TcpServer.CallOnConnStop(c)
  146. // 关闭socket链接
  147. c.Conn.Close()
  148. //关闭Writer
  149. c.ExitBuffChan <- true
  150. //将链接从连接管理器中删除
  151. c.TcpServer.GetConnMgr().Remove(c)
  152. //关闭该链接全部管道
  153. close(c.ExitBuffChan)
  154. close(c.msgBuffChan)
  155. }
  156. //从当前连接获取原始的socket TCPConn
  157. func (c *Connection) GetTCPConnection() *net.TCPConn {
  158. return c.Conn
  159. }
  160. //获取当前连接ID
  161. func (c *Connection) GetConnID() uint32 {
  162. return c.ConnID
  163. }
  164. //获取远程客户端地址信息
  165. func (c *Connection) RemoteAddr() net.Addr {
  166. return c.Conn.RemoteAddr()
  167. }
  168. //直接将Message数据发送数据给远程的TCP客户端
  169. func (c *Connection) SendMsg(msgId uint32, data []byte) error {
  170. if c.isClosed == true {
  171. return errors.New("Connection closed when send msg")
  172. }
  173. //将data封包,并且发送
  174. dp := NewDataPack()
  175. msg, err := dp.Pack(NewMsgPackage(msgId, data))
  176. if err != nil {
  177. fmt.Println("Pack error msg id = ", msgId)
  178. return errors.New("Pack error msg ")
  179. }
  180. //写回客户端
  181. c.msgChan <- msg
  182. return nil
  183. }
  184. func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
  185. if c.isClosed == true {
  186. return errors.New("Connection closed when send buff msg")
  187. }
  188. //将data封包,并且发送
  189. dp := NewDataPack()
  190. msg, err := dp.Pack(NewMsgPackage(msgId, data))
  191. if err != nil {
  192. fmt.Println("Pack error msg id = ", msgId)
  193. return errors.New("Pack error msg ")
  194. }
  195. //写回客户端
  196. c.msgBuffChan <- msg
  197. return nil
  198. }
  199. //设置链接属性
  200. func (c *Connection) SetProperty(key string, value interface{}) {
  201. c.propertyLock.Lock()
  202. defer c.propertyLock.Unlock()
  203. c.property[key] = value
  204. }
  205. //获取链接属性
  206. func (c *Connection) GetProperty(key string) (interface{}, error) {
  207. c.propertyLock.RLock()
  208. defer c.propertyLock.RUnlock()
  209. if value, ok := c.property[key]; ok {
  210. return value, nil
  211. } else {
  212. return nil, errors.New("no property found")
  213. }
  214. }
  215. //移除链接属性
  216. func (c *Connection) RemoveProperty(key string) {
  217. c.propertyLock.Lock()
  218. defer c.propertyLock.Unlock()
  219. delete(c.property, key)
  220. }