You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

88 lines
2.8 KiB

package znet
import (
"fmt"
"strconv"
"github.com/aceld/zinx/utils"
"github.com/aceld/zinx/ziface"
)
// MsgHandle -
type MsgHandle struct {
Apis map[uint32]ziface.IRouter //存放每个MsgID 所对应的处理方法的map属性
WorkerPoolSize uint32 //业务工作Worker池的数量
TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列
}
//NewMsgHandle 创建MsgHandle
func NewMsgHandle() *MsgHandle {
return &MsgHandle{
Apis: make(map[uint32]ziface.IRouter),
WorkerPoolSize: utils.GlobalObject.WorkerPoolSize,
//一个worker对应一个queue
TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
}
}
//SendMsgToTaskQueue 将消息交给TaskQueue,由worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
//根据ConnID来分配当前的连接应该由哪个worker负责处理
//轮询的平均分配法则
//得到需要处理此条连接的workerID
workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
//fmt.Println("Add ConnID=", request.GetConnection().GetConnID()," request msgID=", request.GetMsgID(), "to workerID=", workerID)
//将请求消息发送给任务队列
mh.TaskQueue[workerID] <- request
}
//DoMsgHandler 马上以非阻塞方式处理消息
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
handler, ok := mh.Apis[request.GetMsgID()]
if !ok {
fmt.Println("api msgID = ", request.GetMsgID(), " is not FOUND!")
return
}
//执行对应处理方法
handler.PreHandle(request)
handler.Handle(request)
handler.PostHandle(request)
}
//AddRouter 为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgID uint32, router ziface.IRouter) {
//1 判断当前msg绑定的API处理方法是否已经存在
if _, ok := mh.Apis[msgID]; ok {
panic("repeated api , msgID = " + strconv.Itoa(int(msgID)))
}
//2 添加msg与api的绑定关系
mh.Apis[msgID] = router
fmt.Println("Add api msgID = ", msgID)
}
//StartOneWorker 启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
fmt.Println("Worker ID = ", workerID, " is started.")
//不断的等待队列中的消息
for {
select {
//有消息则取出队列的Request,并执行绑定的业务方法
case request := <-taskQueue:
mh.DoMsgHandler(request)
}
}
}
//StartWorkerPool 启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {
//遍历需要启动worker的数量,依此启动
for i := 0; i < int(mh.WorkerPoolSize); i++ {
//一个worker被启动
//给当前worker对应的任务队列开辟空间
mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
go mh.StartOneWorker(i, mh.TaskQueue[i])
}
}