You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

277 lines
6.4 KiB

  1. package znet
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sync"
  9. "github.com/aceld/zinx/utils"
  10. "github.com/aceld/zinx/ziface"
  11. )
  12. type Connection struct {
  13. //当前Conn属于哪个Server
  14. TcpServer ziface.IServer
  15. //当前连接的socket TCP套接字
  16. Conn *net.TCPConn
  17. //当前连接的ID 也可以称作为SessionID,ID全局唯一
  18. ConnID uint32
  19. //消息管理MsgId和对应处理方法的消息管理模块
  20. MsgHandler ziface.IMsgHandle
  21. //告知该链接已经退出/停止的channel
  22. ctx context.Context
  23. cancel context.CancelFunc
  24. //无缓冲管道,用于读、写两个goroutine之间的消息通信
  25. msgChan chan []byte
  26. //有缓冲管道,用于读、写两个goroutine之间的消息通信
  27. msgBuffChan chan []byte
  28. sync.RWMutex
  29. //链接属性
  30. property map[string]interface{}
  31. ////保护当前property的锁
  32. propertyLock sync.Mutex
  33. //当前连接的关闭状态
  34. isClosed bool
  35. }
  36. //创建连接的方法
  37. func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
  38. //初始化Conn属性
  39. c := &Connection{
  40. TcpServer: server,
  41. Conn: conn,
  42. ConnID: connID,
  43. isClosed: false,
  44. MsgHandler: msgHandler,
  45. msgChan: make(chan []byte),
  46. msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen),
  47. property: make(map[string]interface{}),
  48. }
  49. //将新创建的Conn添加到链接管理中
  50. c.TcpServer.GetConnMgr().Add(c)
  51. return c
  52. }
  53. /*
  54. 写消息Goroutine 用户将数据发送给客户端
  55. */
  56. func (c *Connection) StartWriter() {
  57. fmt.Println("[Writer Goroutine is running]")
  58. defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
  59. for {
  60. select {
  61. case data := <-c.msgChan:
  62. //有数据要写给客户端
  63. if _, err := c.Conn.Write(data); err != nil {
  64. fmt.Println("Send Data error:, ", err, " Conn Writer exit")
  65. return
  66. }
  67. //fmt.Printf("Send data succ! data = %+v\n", data)
  68. case data, ok := <-c.msgBuffChan:
  69. if ok {
  70. //有数据要写给客户端
  71. if _, err := c.Conn.Write(data); err != nil {
  72. fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
  73. return
  74. }
  75. } else {
  76. fmt.Println("msgBuffChan is Closed")
  77. break
  78. }
  79. case <-c.ctx.Done():
  80. return
  81. }
  82. }
  83. }
  84. /*
  85. 读消息Goroutine用于从客户端中读取数据
  86. */
  87. func (c *Connection) StartReader() {
  88. fmt.Println("[Reader Goroutine is running]")
  89. defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
  90. defer c.Stop()
  91. for {
  92. select {
  93. case <-c.ctx.Done():
  94. return
  95. default:
  96. // 创建拆包解包的对象
  97. dp := NewDataPack()
  98. //读取客户端的Msg head
  99. headData := make([]byte, dp.GetHeadLen())
  100. if _, err := io.ReadFull(c.Conn, headData); err != nil {
  101. fmt.Println("read msg head error ", err)
  102. return
  103. }
  104. //fmt.Printf("read headData %+v\n", headData)
  105. //拆包,得到msgid 和 datalen 放在msg中
  106. msg, err := dp.Unpack(headData)
  107. if err != nil {
  108. fmt.Println("unpack error ", err)
  109. return
  110. }
  111. //根据 dataLen 读取 data,放在msg.Data中
  112. var data []byte
  113. if msg.GetDataLen() > 0 {
  114. data = make([]byte, msg.GetDataLen())
  115. if _, err := io.ReadFull(c.Conn, data); err != nil {
  116. fmt.Println("read msg data error ", err)
  117. return
  118. }
  119. }
  120. msg.SetData(data)
  121. //得到当前客户端请求的Request数据
  122. req := Request{
  123. conn: c,
  124. msg: msg,
  125. }
  126. if utils.GlobalObject.WorkerPoolSize > 0 {
  127. //已经启动工作池机制,将消息交给Worker处理
  128. c.MsgHandler.SendMsgToTaskQueue(&req)
  129. } else {
  130. //从绑定好的消息和对应的处理方法中执行对应的Handle方法
  131. go c.MsgHandler.DoMsgHandler(&req)
  132. }
  133. }
  134. }
  135. }
  136. //启动连接,让当前连接开始工作
  137. func (c *Connection) Start() {
  138. c.ctx, c.cancel = context.WithCancel(context.Background())
  139. //1 开启用户从客户端读取数据流程的Goroutine
  140. go c.StartReader()
  141. //2 开启用于写回客户端数据流程的Goroutine
  142. go c.StartWriter()
  143. //按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
  144. c.TcpServer.CallOnConnStart(c)
  145. }
  146. //停止连接,结束当前连接状态M
  147. func (c *Connection) Stop() {
  148. fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
  149. c.Lock()
  150. defer c.Unlock()
  151. //如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
  152. c.TcpServer.CallOnConnStop(c)
  153. //如果当前链接已经关闭
  154. if c.isClosed == true {
  155. return
  156. }
  157. c.isClosed = true
  158. // 关闭socket链接
  159. c.Conn.Close()
  160. //关闭Writer
  161. c.cancel()
  162. //将链接从连接管理器中删除
  163. c.TcpServer.GetConnMgr().Remove(c)
  164. //关闭该链接全部管道
  165. close(c.msgBuffChan)
  166. }
  167. //从当前连接获取原始的socket TCPConn
  168. func (c *Connection) GetTCPConnection() *net.TCPConn {
  169. return c.Conn
  170. }
  171. //获取当前连接ID
  172. func (c *Connection) GetConnID() uint32 {
  173. return c.ConnID
  174. }
  175. //获取远程客户端地址信息
  176. func (c *Connection) RemoteAddr() net.Addr {
  177. return c.Conn.RemoteAddr()
  178. }
  179. //直接将Message数据发送数据给远程的TCP客户端
  180. func (c *Connection) SendMsg(msgId uint32, data []byte) error {
  181. c.RLock()
  182. if c.isClosed == true {
  183. c.RUnlock()
  184. return errors.New("connection closed when send msg")
  185. }
  186. c.RUnlock()
  187. //将data封包,并且发送
  188. dp := NewDataPack()
  189. msg, err := dp.Pack(NewMsgPackage(msgId, data))
  190. if err != nil {
  191. fmt.Println("Pack error msg id = ", msgId)
  192. return errors.New("Pack error msg ")
  193. }
  194. //写回客户端
  195. c.msgChan <- msg
  196. return nil
  197. }
  198. func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
  199. c.RLock()
  200. if c.isClosed == true {
  201. c.RUnlock()
  202. return errors.New("Connection closed when send buff msg")
  203. }
  204. c.RUnlock()
  205. //将data封包,并且发送
  206. dp := NewDataPack()
  207. msg, err := dp.Pack(NewMsgPackage(msgId, data))
  208. if err != nil {
  209. fmt.Println("Pack error msg id = ", msgId)
  210. return errors.New("Pack error msg ")
  211. }
  212. //写回客户端
  213. c.msgBuffChan <- msg
  214. return nil
  215. }
  216. //设置链接属性
  217. func (c *Connection) SetProperty(key string, value interface{}) {
  218. c.propertyLock.Lock()
  219. defer c.propertyLock.Unlock()
  220. c.property[key] = value
  221. }
  222. //获取链接属性
  223. func (c *Connection) GetProperty(key string) (interface{}, error) {
  224. c.propertyLock.Lock()
  225. defer c.propertyLock.Unlock()
  226. if value, ok := c.property[key]; ok {
  227. return value, nil
  228. } else {
  229. return nil, errors.New("no property found")
  230. }
  231. }
  232. //移除链接属性
  233. func (c *Connection) RemoveProperty(key string) {
  234. c.propertyLock.Lock()
  235. defer c.propertyLock.Unlock()
  236. delete(c.property, key)
  237. }