You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

88 lines
2.8 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package znet
  2. import (
  3. "fmt"
  4. "strconv"
  5. "github.com/aceld/zinx/utils"
  6. "github.com/aceld/zinx/ziface"
  7. )
  8. // MsgHandle -
  9. type MsgHandle struct {
  10. Apis map[uint32]ziface.IRouter //存放每个MsgID 所对应的处理方法的map属性
  11. WorkerPoolSize uint32 //业务工作Worker池的数量
  12. TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列
  13. }
  14. //NewMsgHandle 创建MsgHandle
  15. func NewMsgHandle() *MsgHandle {
  16. return &MsgHandle{
  17. Apis: make(map[uint32]ziface.IRouter),
  18. WorkerPoolSize: utils.GlobalObject.WorkerPoolSize,
  19. //一个worker对应一个queue
  20. TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
  21. }
  22. }
  23. //SendMsgToTaskQueue 将消息交给TaskQueue,由worker进行处理
  24. func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
  25. //根据ConnID来分配当前的连接应该由哪个worker负责处理
  26. //轮询的平均分配法则
  27. //得到需要处理此条连接的workerID
  28. workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
  29. //fmt.Println("Add ConnID=", request.GetConnection().GetConnID()," request msgID=", request.GetMsgID(), "to workerID=", workerID)
  30. //将请求消息发送给任务队列
  31. mh.TaskQueue[workerID] <- request
  32. }
  33. //DoMsgHandler 马上以非阻塞方式处理消息
  34. func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
  35. handler, ok := mh.Apis[request.GetMsgID()]
  36. if !ok {
  37. fmt.Println("api msgID = ", request.GetMsgID(), " is not FOUND!")
  38. return
  39. }
  40. //执行对应处理方法
  41. handler.PreHandle(request)
  42. handler.Handle(request)
  43. handler.PostHandle(request)
  44. }
  45. //AddRouter 为消息添加具体的处理逻辑
  46. func (mh *MsgHandle) AddRouter(msgID uint32, router ziface.IRouter) {
  47. //1 判断当前msg绑定的API处理方法是否已经存在
  48. if _, ok := mh.Apis[msgID]; ok {
  49. panic("repeated api , msgID = " + strconv.Itoa(int(msgID)))
  50. }
  51. //2 添加msg与api的绑定关系
  52. mh.Apis[msgID] = router
  53. fmt.Println("Add api msgID = ", msgID)
  54. }
  55. //StartOneWorker 启动一个Worker工作流程
  56. func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
  57. fmt.Println("Worker ID = ", workerID, " is started.")
  58. //不断的等待队列中的消息
  59. for {
  60. select {
  61. //有消息则取出队列的Request,并执行绑定的业务方法
  62. case request := <-taskQueue:
  63. mh.DoMsgHandler(request)
  64. }
  65. }
  66. }
  67. //StartWorkerPool 启动worker工作池
  68. func (mh *MsgHandle) StartWorkerPool() {
  69. //遍历需要启动worker的数量,依此启动
  70. for i := 0; i < int(mh.WorkerPoolSize); i++ {
  71. //一个worker被启动
  72. //给当前worker对应的任务队列开辟空间
  73. mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
  74. //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
  75. go mh.StartOneWorker(i, mh.TaskQueue[i])
  76. }
  77. }