diff --git a/utils/globalobj.go b/utils/globalobj.go index 8c358aa..c32f82a 100644 --- a/utils/globalobj.go +++ b/utils/globalobj.go @@ -23,14 +23,16 @@ type GlobalObj struct { /* Zinx */ - Version string //当前Zinx版本号 - MaxPacketSize uint32 //都需数据包的最大值 - MaxConn int //当前服务器主机允许的最大链接个数 + Version string //当前Zinx版本号 + MaxPacketSize uint32 //都需数据包的最大值 + MaxConn int //当前服务器主机允许的最大链接个数 + WorkerPoolSize uint32 //业务工作Worker池的数量 + MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量 /* config file path - */ - ConfFilePath string + */ + ConfFilePath string } /* @@ -53,7 +55,7 @@ func PathExists(path string) (bool, error) { //读取用户的配置文件 func (g *GlobalObj) Reload() { - if confFileExists, _ := PathExists(g.ConfFilePath) ; confFileExists != true { + if confFileExists, _ := PathExists(g.ConfFilePath); confFileExists != true { //fmt.Println("Config File ", g.ConfFilePath , " is not exist!!") return } @@ -82,7 +84,9 @@ func init() { Host: "0.0.0.0", MaxConn: 12000, MaxPacketSize: 4096, - ConfFilePath: "conf/zinx.json", + ConfFilePath: "conf/zinx.json", + WorkerPoolSize: 10, + MaxWorkerTaskLen: 1024, } //从配置文件中加载一些用户配置的参数 diff --git a/ziface/iconnection.go b/ziface/iconnection.go index 83e4ba5..030fd68 100644 --- a/ziface/iconnection.go +++ b/ziface/iconnection.go @@ -16,8 +16,6 @@ type IConnection interface { RemoteAddr() net.Addr //直接将Message数据发送数据给远程的TCP客户端 SendMsg(msgId uint32, data []byte) error - //将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端 - //SendBuff(data []byte) error } diff --git a/ziface/imsghandler.go b/ziface/imsghandler.go index 36d8f9d..11f1c48 100644 --- a/ziface/imsghandler.go +++ b/ziface/imsghandler.go @@ -1,8 +1,11 @@ package ziface + /* 消息管理抽象层 */ type IMsgHandle interface{ DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息 AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑 + StartWorkerPool() //启动worker工作池 + SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理 } \ No newline at end of file diff --git a/znet/connection.go b/znet/connection.go index 459081f..b61faba 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "zinx/utils" "zinx/ziface" ) @@ -21,10 +22,6 @@ type Connection struct { ExitBuffChan chan bool //无缓冲管道,用于读、写两个goroutine之间的消息通信 msgChan chan []byte - //给缓冲队列发送数据的channel, - // 如果向缓冲队列发送数据,那么把数据发送到这个channel下 -// SendBuffChan chan []byte - } @@ -112,8 +109,14 @@ func (c *Connection) StartReader() { conn:c, msg:msg, } - //从绑定好的消息和对应的处理方法中执行对应的Handle方法 - go c.MsgHandler.DoMsgHandler(&req) + + if utils.GlobalObject.WorkerPoolSize > 0 { + //已经启动工作池机制,将消息交给Worker处理 + c.MsgHandler.SendMsgToTaskQueue(&req) + } else { + //从绑定好的消息和对应的处理方法中执行对应的Handle方法 + go c.MsgHandler.DoMsgHandler(&req) + } } } @@ -187,9 +190,4 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error { c.msgChan <- msg return nil -} - -//将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端 -//func (c *Connection) SendBuff(data []byte) error { -// return nil -//} +} \ No newline at end of file diff --git a/znet/msghandler.go b/znet/msghandler.go index 8fdf966..bdcb31e 100644 --- a/znet/msghandler.go +++ b/znet/msghandler.go @@ -3,21 +3,40 @@ package znet import ( "fmt" "strconv" + "zinx/utils" "zinx/ziface" ) -type MsgHandle struct{ - Apis map[uint32] ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性 +type MsgHandle struct { + Apis map[uint32]ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性 + WorkerPoolSize uint32 //业务工作Worker池的数量 + TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列 } func NewMsgHandle() *MsgHandle { - return &MsgHandle { - Apis:make(map[uint32]ziface.IRouter), + return &MsgHandle{ + Apis: make(map[uint32]ziface.IRouter), + WorkerPoolSize:utils.GlobalObject.WorkerPoolSize, + //一个worker对应一个queue + TaskQueue:make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize), } } +//将消息交给TaskQueue,由worker进行处理 +func (mh *MsgHandle)SendMsgToTaskQueue(request ziface.IRequest) { + //根据ConnID来分配当前的连接应该由哪个worker负责处理 + //轮询的平均分配法则 + + //得到需要处理此条连接的workerID + workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize + fmt.Println("Add ConnID=", request.GetConnection().GetConnID()," request msgID=", request.GetMsgID(), "to workerID=", workerID) + //将请求消息发送给任务队列 + mh.TaskQueue[workerID] <- request +} + + //马上以非阻塞方式处理消息 -func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) { +func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) { handler, ok := mh.Apis[request.GetMsgID()] if !ok { fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!") @@ -29,6 +48,7 @@ func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) { handler.Handle(request) handler.PostHandle(request) } + //为消息添加具体的处理逻辑 func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) { //1 判断当前msg绑定的API处理方法是否已经存在 @@ -40,5 +60,27 @@ func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) { fmt.Println("Add api msgId = ", msgId) } +//启动一个Worker工作流程 +func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) { + fmt.Println("Worker ID = ", workerID, " is started.") + //不断的等待队列中的消息 + for { + select { + //有消息则取出队列的Request,并执行绑定的业务方法 + case request := <-taskQueue: + mh.DoMsgHandler(request) + } + } +} - +//启动worker工作池 +func (mh *MsgHandle) StartWorkerPool() { + //遍历需要启动worker的数量,依此启动 + for i:= 0; i < int(mh.WorkerPoolSize); i++ { + //一个worker被启动 + //给当前worker对应的任务队列开辟空间 + mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen) + //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来 + go mh.StartOneWorker(i, mh.TaskQueue[i]) + } +} \ No newline at end of file diff --git a/znet/server.go b/znet/server.go index eaf7640..34dfcec 100644 --- a/znet/server.go +++ b/znet/server.go @@ -48,6 +48,9 @@ func (s *Server) Start() { //开启一个go去做服务端Linster业务 go func() { + //0 启动worker工作池机制 + s.msgHandler.StartWorkerPool() + //1 获取一个TCP的Addr addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port)) if err != nil {