You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

257 lines
6.2 KiB

  1. package znet
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sync"
  8. "zinx/utils"
  9. "zinx/ziface"
  10. )
  11. type Connection struct {
  12. //当前Conn属于哪个Server
  13. TcpServer ziface.IServer
  14. //当前连接的socket TCP套接字
  15. Conn *net.TCPConn
  16. //当前连接的ID 也可以称作为SessionID,ID全局唯一
  17. ConnID uint32
  18. //当前连接的关闭状态
  19. isClosed bool
  20. //消息管理MsgId和对应处理方法的消息管理模块
  21. MsgHandler ziface.IMsgHandle
  22. //告知该链接已经退出/停止的channel
  23. ExitBuffChan chan bool
  24. //无缓冲管道,用于读、写两个goroutine之间的消息通信
  25. msgChan chan []byte
  26. //有关冲管道,用于读、写两个goroutine之间的消息通信
  27. msgBuffChan chan []byte
  28. //链接属性
  29. property map[string]interface{}
  30. //保护链接属性修改的锁
  31. propertyLock sync.RWMutex
  32. }
  33. //创建连接的方法
  34. func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
  35. //初始化Conn属性
  36. c := &Connection{
  37. TcpServer: server,
  38. Conn: conn,
  39. ConnID: connID,
  40. isClosed: false,
  41. MsgHandler: msgHandler,
  42. ExitBuffChan: make(chan bool, 1),
  43. msgChan: make(chan []byte),
  44. msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen),
  45. property: make(map[string]interface{}),
  46. }
  47. //将新创建的Conn添加到链接管理中
  48. c.TcpServer.GetConnMgr().Add(c)
  49. return c
  50. }
  51. /*
  52. 写消息Goroutine 用户将数据发送给客户端
  53. */
  54. func (c *Connection) StartWriter() {
  55. fmt.Println("[Writer Goroutine is running]")
  56. defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
  57. for {
  58. select {
  59. case data := <-c.msgChan:
  60. //有数据要写给客户端
  61. if _, err := c.Conn.Write(data); err != nil {
  62. fmt.Println("Send Data error:, ", err, " Conn Writer exit")
  63. return
  64. }
  65. fmt.Printf("Send data succ! data = %+v\n", data)
  66. case data, ok := <-c.msgBuffChan:
  67. if ok {
  68. //有数据要写给客户端
  69. if _, err := c.Conn.Write(data); err != nil {
  70. fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
  71. return
  72. }
  73. } else {
  74. break
  75. fmt.Println("msgBuffChan is Closed")
  76. }
  77. case <-c.ExitBuffChan:
  78. return
  79. }
  80. }
  81. }
  82. /*
  83. 读消息Goroutine用于从客户端中读取数据
  84. */
  85. func (c *Connection) StartReader() {
  86. fmt.Println("[Reader Goroutine is running]")
  87. defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
  88. defer c.Stop()
  89. for {
  90. // 创建拆包解包的对象
  91. dp := NewDataPack()
  92. //读取客户端的Msg head
  93. headData := make([]byte, dp.GetHeadLen())
  94. if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
  95. fmt.Println("read msg head error ", err)
  96. break
  97. }
  98. fmt.Printf("read headData %+v\n", headData)
  99. //拆包,得到msgid 和 datalen 放在msg中
  100. msg, err := dp.Unpack(headData)
  101. if err != nil {
  102. fmt.Println("unpack error ", err)
  103. break
  104. }
  105. //根据 dataLen 读取 data,放在msg.Data中
  106. var data []byte
  107. if msg.GetDataLen() > 0 {
  108. data = make([]byte, msg.GetDataLen())
  109. if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
  110. fmt.Println("read msg data error ", err)
  111. break
  112. }
  113. }
  114. msg.SetData(data)
  115. //得到当前客户端请求的Request数据
  116. req := Request{
  117. conn: c,
  118. msg: msg,
  119. }
  120. if utils.GlobalObject.WorkerPoolSize > 0 {
  121. //已经启动工作池机制,将消息交给Worker处理
  122. c.MsgHandler.SendMsgToTaskQueue(&req)
  123. } else {
  124. //从绑定好的消息和对应的处理方法中执行对应的Handle方法
  125. go c.MsgHandler.DoMsgHandler(&req)
  126. }
  127. }
  128. }
  129. //启动连接,让当前连接开始工作
  130. func (c *Connection) Start() {
  131. //1 开启用户从客户端读取数据流程的Goroutine
  132. go c.StartReader()
  133. //2 开启用于写回客户端数据流程的Goroutine
  134. go c.StartWriter()
  135. //按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
  136. c.TcpServer.CallOnConnStart(c)
  137. }
  138. //停止连接,结束当前连接状态M
  139. func (c *Connection) Stop() {
  140. fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
  141. //如果当前链接已经关闭
  142. if c.isClosed == true {
  143. return
  144. }
  145. c.isClosed = true
  146. //如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
  147. c.TcpServer.CallOnConnStop(c)
  148. // 关闭socket链接
  149. c.Conn.Close()
  150. //关闭Writer
  151. c.ExitBuffChan <- true
  152. //将链接从连接管理器中删除
  153. c.TcpServer.GetConnMgr().Remove(c)
  154. //关闭该链接全部管道
  155. close(c.ExitBuffChan)
  156. close(c.msgBuffChan)
  157. }
  158. //从当前连接获取原始的socket TCPConn
  159. func (c *Connection) GetTCPConnection() *net.TCPConn {
  160. return c.Conn
  161. }
  162. //获取当前连接ID
  163. func (c *Connection) GetConnID() uint32 {
  164. return c.ConnID
  165. }
  166. //获取远程客户端地址信息
  167. func (c *Connection) RemoteAddr() net.Addr {
  168. return c.Conn.RemoteAddr()
  169. }
  170. //直接将Message数据发送数据给远程的TCP客户端
  171. func (c *Connection) SendMsg(msgId uint32, data []byte) error {
  172. if c.isClosed == true {
  173. return errors.New("Connection closed when send msg")
  174. }
  175. //将data封包,并且发送
  176. dp := NewDataPack()
  177. msg, err := dp.Pack(NewMsgPackage(msgId, data))
  178. if err != nil {
  179. fmt.Println("Pack error msg id = ", msgId)
  180. return errors.New("Pack error msg ")
  181. }
  182. //写回客户端
  183. c.msgChan <- msg
  184. return nil
  185. }
  186. func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
  187. if c.isClosed == true {
  188. return errors.New("Connection closed when send buff msg")
  189. }
  190. //将data封包,并且发送
  191. dp := NewDataPack()
  192. msg, err := dp.Pack(NewMsgPackage(msgId, data))
  193. if err != nil {
  194. fmt.Println("Pack error msg id = ", msgId)
  195. return errors.New("Pack error msg ")
  196. }
  197. //写回客户端
  198. c.msgBuffChan <- msg
  199. return nil
  200. }
  201. //设置链接属性
  202. func (c *Connection) SetProperty(key string, value interface{}) {
  203. c.propertyLock.Lock()
  204. defer c.propertyLock.Unlock()
  205. c.property[key] = value
  206. }
  207. //获取链接属性
  208. func (c *Connection) GetProperty(key string) (interface{}, error) {
  209. c.propertyLock.RLock()
  210. defer c.propertyLock.RUnlock()
  211. if value, ok := c.property[key]; ok {
  212. return value, nil
  213. } else {
  214. return nil, errors.New("no property found")
  215. }
  216. }
  217. //移除链接属性
  218. func (c *Connection) RemoveProperty(key string) {
  219. c.propertyLock.Lock()
  220. defer c.propertyLock.Unlock()
  221. delete(c.property, key)
  222. }