You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

85 lines
2.7 KiB

  1. package znet
  2. import (
  3. "fmt"
  4. "strconv"
  5. "zinx/utils"
  6. "zinx/ziface"
  7. )
  8. type MsgHandle struct {
  9. Apis map[uint32]ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性
  10. WorkerPoolSize uint32 //业务工作Worker池的数量
  11. TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列
  12. }
  13. func NewMsgHandle() *MsgHandle {
  14. return &MsgHandle{
  15. Apis: make(map[uint32]ziface.IRouter),
  16. WorkerPoolSize:utils.GlobalObject.WorkerPoolSize,
  17. //一个worker对应一个queue
  18. TaskQueue:make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
  19. }
  20. }
  21. //将消息交给TaskQueue,由worker进行处理
  22. func (mh *MsgHandle)SendMsgToTaskQueue(request ziface.IRequest) {
  23. //根据ConnID来分配当前的连接应该由哪个worker负责处理
  24. //轮询的平均分配法则
  25. //得到需要处理此条连接的workerID
  26. workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
  27. //fmt.Println("Add ConnID=", request.GetConnection().GetConnID()," request msgID=", request.GetMsgID(), "to workerID=", workerID)
  28. //将请求消息发送给任务队列
  29. mh.TaskQueue[workerID] <- request
  30. }
  31. //马上以非阻塞方式处理消息
  32. func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
  33. handler, ok := mh.Apis[request.GetMsgID()]
  34. if !ok {
  35. fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!")
  36. return
  37. }
  38. //执行对应处理方法
  39. handler.PreHandle(request)
  40. handler.Handle(request)
  41. handler.PostHandle(request)
  42. }
  43. //为消息添加具体的处理逻辑
  44. func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {
  45. //1 判断当前msg绑定的API处理方法是否已经存在
  46. if _, ok := mh.Apis[msgId]; ok {
  47. panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))
  48. }
  49. //2 添加msg与api的绑定关系
  50. mh.Apis[msgId] = router
  51. fmt.Println("Add api msgId = ", msgId)
  52. }
  53. //启动一个Worker工作流程
  54. func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
  55. fmt.Println("Worker ID = ", workerID, " is started.")
  56. //不断的等待队列中的消息
  57. for {
  58. select {
  59. //有消息则取出队列的Request,并执行绑定的业务方法
  60. case request := <-taskQueue:
  61. mh.DoMsgHandler(request)
  62. }
  63. }
  64. }
  65. //启动worker工作池
  66. func (mh *MsgHandle) StartWorkerPool() {
  67. //遍历需要启动worker的数量,依此启动
  68. for i:= 0; i < int(mh.WorkerPoolSize); i++ {
  69. //一个worker被启动
  70. //给当前worker对应的任务队列开辟空间
  71. mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
  72. //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
  73. go mh.StartOneWorker(i, mh.TaskQueue[i])
  74. }
  75. }