You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

284 lines
6.8 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package znet
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "sync"
  9. "github.com/aceld/zinx/utils"
  10. "github.com/aceld/zinx/ziface"
  11. )
  12. //Connection 链接
  13. type Connection struct {
  14. //当前Conn属于哪个Server
  15. TCPServer ziface.IServer
  16. //当前连接的socket TCP套接字
  17. Conn *net.TCPConn
  18. //当前连接的ID 也可以称作为SessionID,ID全局唯一
  19. ConnID uint32
  20. //消息管理MsgID和对应处理方法的消息管理模块
  21. MsgHandler ziface.IMsgHandle
  22. //告知该链接已经退出/停止的channel
  23. ctx context.Context
  24. cancel context.CancelFunc
  25. //无缓冲管道,用于读、写两个goroutine之间的消息通信
  26. msgChan chan []byte
  27. //有缓冲管道,用于读、写两个goroutine之间的消息通信
  28. msgBuffChan chan []byte
  29. sync.RWMutex
  30. //链接属性
  31. property map[string]interface{}
  32. ////保护当前property的锁
  33. propertyLock sync.Mutex
  34. //当前连接的关闭状态
  35. isClosed bool
  36. }
  37. //NewConnection 创建连接的方法
  38. func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
  39. //初始化Conn属性
  40. c := &Connection{
  41. TCPServer: server,
  42. Conn: conn,
  43. ConnID: connID,
  44. isClosed: false,
  45. MsgHandler: msgHandler,
  46. msgChan: make(chan []byte),
  47. msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen),
  48. property: nil,
  49. }
  50. //将新创建的Conn添加到链接管理中
  51. c.TCPServer.GetConnMgr().Add(c)
  52. return c
  53. }
  54. //StartWriter 写消息Goroutine, 用户将数据发送给客户端
  55. func (c *Connection) StartWriter() {
  56. fmt.Println("[Writer Goroutine is running]")
  57. defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
  58. for {
  59. select {
  60. case data := <-c.msgChan:
  61. //有数据要写给客户端
  62. if _, err := c.Conn.Write(data); err != nil {
  63. fmt.Println("Send Data error:, ", err, " Conn Writer exit")
  64. return
  65. }
  66. //fmt.Printf("Send data succ! data = %+v\n", data)
  67. case data, ok := <-c.msgBuffChan:
  68. if ok {
  69. //有数据要写给客户端
  70. if _, err := c.Conn.Write(data); err != nil {
  71. fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
  72. return
  73. }
  74. } else {
  75. fmt.Println("msgBuffChan is Closed")
  76. break
  77. }
  78. case <-c.ctx.Done():
  79. return
  80. }
  81. }
  82. }
  83. //StartReader 读消息Goroutine,用于从客户端中读取数据
  84. func (c *Connection) StartReader() {
  85. fmt.Println("[Reader Goroutine is running]")
  86. defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
  87. defer c.Stop()
  88. // 创建拆包解包的对象
  89. for {
  90. select {
  91. case <-c.ctx.Done():
  92. return
  93. default:
  94. //读取客户端的Msg head
  95. headData := make([]byte, c.TCPServer.Packet().GetHeadLen())
  96. if _, err := io.ReadFull(c.Conn, headData); err != nil {
  97. fmt.Println("read msg head error ", err)
  98. return
  99. }
  100. //fmt.Printf("read headData %+v\n", headData)
  101. //拆包,得到msgID 和 datalen 放在msg中
  102. msg, err := c.TCPServer.Packet().Unpack(headData)
  103. if err != nil {
  104. fmt.Println("unpack error ", err)
  105. return
  106. }
  107. //根据 dataLen 读取 data,放在msg.Data中
  108. var data []byte
  109. if msg.GetDataLen() > 0 {
  110. data = make([]byte, msg.GetDataLen())
  111. if _, err := io.ReadFull(c.Conn, data); err != nil {
  112. fmt.Println("read msg data error ", err)
  113. return
  114. }
  115. }
  116. msg.SetData(data)
  117. //得到当前客户端请求的Request数据
  118. req := Request{
  119. conn: c,
  120. msg: msg,
  121. }
  122. if utils.GlobalObject.WorkerPoolSize > 0 {
  123. //已经启动工作池机制,将消息交给Worker处理
  124. c.MsgHandler.SendMsgToTaskQueue(&req)
  125. } else {
  126. //从绑定好的消息和对应的处理方法中执行对应的Handle方法
  127. go c.MsgHandler.DoMsgHandler(&req)
  128. }
  129. }
  130. }
  131. }
  132. //Start 启动连接,让当前连接开始工作
  133. func (c *Connection) Start() {
  134. c.ctx, c.cancel = context.WithCancel(context.Background())
  135. //1 开启用户从客户端读取数据流程的Goroutine
  136. go c.StartReader()
  137. //2 开启用于写回客户端数据流程的Goroutine
  138. go c.StartWriter()
  139. //按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
  140. c.TCPServer.CallOnConnStart(c)
  141. }
  142. //Stop 停止连接,结束当前连接状态M
  143. func (c *Connection) Stop() {
  144. c.Lock()
  145. defer c.Unlock()
  146. c.TCPServer.CallOnConnStop(c)
  147. //如果当前链接已经关闭
  148. if c.isClosed == true {
  149. return
  150. }
  151. fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
  152. //如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
  153. c.TCPServer.CallOnConnStop(c)
  154. // 关闭socket链接
  155. c.Conn.Close()
  156. //关闭Writer
  157. c.cancel()
  158. //将链接从连接管理器中删除
  159. c.TCPServer.GetConnMgr().Remove(c)
  160. //关闭该链接全部管道
  161. close(c.msgBuffChan)
  162. //设置标志位
  163. c.isClosed = true
  164. }
  165. //GetTCPConnection 从当前连接获取原始的socket TCPConn
  166. func (c *Connection) GetTCPConnection() *net.TCPConn {
  167. return c.Conn
  168. }
  169. //GetConnID 获取当前连接ID
  170. func (c *Connection) GetConnID() uint32 {
  171. return c.ConnID
  172. }
  173. //RemoteAddr 获取远程客户端地址信息
  174. func (c *Connection) RemoteAddr() net.Addr {
  175. return c.Conn.RemoteAddr()
  176. }
  177. //SendMsg 直接将Message数据发送数据给远程的TCP客户端
  178. func (c *Connection) SendMsg(msgID uint32, data []byte) error {
  179. c.RLock()
  180. defer c.RUnlock()
  181. if c.isClosed == true {
  182. return errors.New("connection closed when send msg")
  183. }
  184. //将data封包,并且发送
  185. dp := c.TCPServer.Packet()
  186. msg, err := dp.Pack(NewMsgPackage(msgID, data))
  187. if err != nil {
  188. fmt.Println("Pack error msg ID = ", msgID)
  189. return errors.New("Pack error msg ")
  190. }
  191. //写回客户端
  192. c.msgChan <- msg
  193. return nil
  194. }
  195. //SendBuffMsg 发生BuffMsg
  196. func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error {
  197. c.RLock()
  198. defer c.RUnlock()
  199. if c.isClosed == true {
  200. return errors.New("Connection closed when send buff msg")
  201. }
  202. //将data封包,并且发送
  203. dp := c.TCPServer.Packet()
  204. msg, err := dp.Pack(NewMsgPackage(msgID, data))
  205. if err != nil {
  206. fmt.Println("Pack error msg ID = ", msgID)
  207. return errors.New("Pack error msg ")
  208. }
  209. //写回客户端
  210. c.msgBuffChan <- msg
  211. return nil
  212. }
  213. //SetProperty 设置链接属性
  214. func (c *Connection) SetProperty(key string, value interface{}) {
  215. c.propertyLock.Lock()
  216. defer c.propertyLock.Unlock()
  217. if c.property == nil {
  218. c.property = make(map[string]interface{})
  219. }
  220. c.property[key] = value
  221. }
  222. //GetProperty 获取链接属性
  223. func (c *Connection) GetProperty(key string) (interface{}, error) {
  224. c.propertyLock.Lock()
  225. defer c.propertyLock.Unlock()
  226. if value, ok := c.property[key]; ok {
  227. return value, nil
  228. }
  229. return nil, errors.New("no property found")
  230. }
  231. //RemoveProperty 移除链接属性
  232. func (c *Connection) RemoveProperty(key string) {
  233. c.propertyLock.Lock()
  234. defer c.propertyLock.Unlock()
  235. delete(c.property, key)
  236. }
  237. //返回ctx,用于用户自定义的go程获取连接退出状态
  238. func (c *Connection) Context() context.Context {
  239. return c.ctx
  240. }