Browse Source

添加消息队列及worker任务池机制 Zinx-V0.8

master
aceld 6 years ago
parent
commit
630248b1e9
  1. 4
      utils/globalobj.go
  2. 2
      ziface/iconnection.go
  3. 3
      ziface/imsghandler.go
  4. 16
      znet/connection.go
  5. 44
      znet/msghandler.go
  6. 3
      znet/server.go

4
utils/globalobj.go

@ -26,6 +26,8 @@ type GlobalObj struct {
Version string //当前Zinx版本号 Version string //当前Zinx版本号
MaxPacketSize uint32 //都需数据包的最大值 MaxPacketSize uint32 //都需数据包的最大值
MaxConn int //当前服务器主机允许的最大链接个数 MaxConn int //当前服务器主机允许的最大链接个数
WorkerPoolSize uint32 //业务工作Worker池的数量
MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量
/* /*
config file path config file path
@ -83,6 +85,8 @@ func init() {
MaxConn: 12000, MaxConn: 12000,
MaxPacketSize: 4096, MaxPacketSize: 4096,
ConfFilePath: "conf/zinx.json", ConfFilePath: "conf/zinx.json",
WorkerPoolSize: 10,
MaxWorkerTaskLen: 1024,
} }
//从配置文件中加载一些用户配置的参数 //从配置文件中加载一些用户配置的参数

2
ziface/iconnection.go

@ -16,8 +16,6 @@ type IConnection interface {
RemoteAddr() net.Addr RemoteAddr() net.Addr
//直接将Message数据发送数据给远程的TCP客户端 //直接将Message数据发送数据给远程的TCP客户端
SendMsg(msgId uint32, data []byte) error SendMsg(msgId uint32, data []byte) error
//将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端
//SendBuff(data []byte) error
} }

3
ziface/imsghandler.go

@ -1,8 +1,11 @@
package ziface package ziface
/* /*
消息管理抽象层 消息管理抽象层
*/ */
type IMsgHandle interface{ type IMsgHandle interface{
DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息 DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息
AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑 AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑
StartWorkerPool() //启动worker工作池
SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理
} }

16
znet/connection.go

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"zinx/utils"
"zinx/ziface" "zinx/ziface"
) )
@ -21,10 +22,6 @@ type Connection struct {
ExitBuffChan chan bool ExitBuffChan chan bool
//无缓冲管道,用于读、写两个goroutine之间的消息通信 //无缓冲管道,用于读、写两个goroutine之间的消息通信
msgChan chan []byte msgChan chan []byte
//给缓冲队列发送数据的channel,
// 如果向缓冲队列发送数据,那么把数据发送到这个channel下
// SendBuffChan chan []byte
} }
@ -112,10 +109,16 @@ func (c *Connection) StartReader() {
conn:c, conn:c,
msg:msg, msg:msg,
} }
if utils.GlobalObject.WorkerPoolSize > 0 {
//已经启动工作池机制,将消息交给Worker处理
c.MsgHandler.SendMsgToTaskQueue(&req)
} else {
//从绑定好的消息和对应的处理方法中执行对应的Handle方法 //从绑定好的消息和对应的处理方法中执行对应的Handle方法
go c.MsgHandler.DoMsgHandler(&req) go c.MsgHandler.DoMsgHandler(&req)
} }
} }
}
//启动连接,让当前连接开始工作 //启动连接,让当前连接开始工作
func (c *Connection) Start() { func (c *Connection) Start() {
@ -188,8 +191,3 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error {
return nil return nil
} }
//将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端
//func (c *Connection) SendBuff(data []byte) error {
// return nil
//}

44
znet/msghandler.go

@ -3,19 +3,38 @@ package znet
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"zinx/utils"
"zinx/ziface" "zinx/ziface"
) )
type MsgHandle struct { type MsgHandle struct {
Apis map[uint32]ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性 Apis map[uint32]ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性
WorkerPoolSize uint32 //业务工作Worker池的数量
TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列
} }
func NewMsgHandle() *MsgHandle { func NewMsgHandle() *MsgHandle {
return &MsgHandle{ return &MsgHandle{
Apis: make(map[uint32]ziface.IRouter), Apis: make(map[uint32]ziface.IRouter),
WorkerPoolSize:utils.GlobalObject.WorkerPoolSize,
//一个worker对应一个queue
TaskQueue:make([]chan ziface.IRequest, utils.GlobalObject.WorkerPoolSize),
} }
} }
//将消息交给TaskQueue,由worker进行处理
func (mh *MsgHandle)SendMsgToTaskQueue(request ziface.IRequest) {
//根据ConnID来分配当前的连接应该由哪个worker负责处理
//轮询的平均分配法则
//得到需要处理此条连接的workerID
workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
fmt.Println("Add ConnID=", request.GetConnection().GetConnID()," request msgID=", request.GetMsgID(), "to workerID=", workerID)
//将请求消息发送给任务队列
mh.TaskQueue[workerID] <- request
}
//马上以非阻塞方式处理消息 //马上以非阻塞方式处理消息
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) { func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
handler, ok := mh.Apis[request.GetMsgID()] handler, ok := mh.Apis[request.GetMsgID()]
@ -29,6 +48,7 @@ func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
handler.Handle(request) handler.Handle(request)
handler.PostHandle(request) handler.PostHandle(request)
} }
//为消息添加具体的处理逻辑 //为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) { func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {
//1 判断当前msg绑定的API处理方法是否已经存在 //1 判断当前msg绑定的API处理方法是否已经存在
@ -40,5 +60,27 @@ func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {
fmt.Println("Add api msgId = ", msgId) fmt.Println("Add api msgId = ", msgId)
} }
//启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
fmt.Println("Worker ID = ", workerID, " is started.")
//不断的等待队列中的消息
for {
select {
//有消息则取出队列的Request,并执行绑定的业务方法
case request := <-taskQueue:
mh.DoMsgHandler(request)
}
}
}
//启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {
//遍历需要启动worker的数量,依此启动
for i:= 0; i < int(mh.WorkerPoolSize); i++ {
//一个worker被启动
//给当前worker对应的任务队列开辟空间
mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
//启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来
go mh.StartOneWorker(i, mh.TaskQueue[i])
}
}

3
znet/server.go

@ -48,6 +48,9 @@ func (s *Server) Start() {
//开启一个go去做服务端Linster业务 //开启一个go去做服务端Linster业务
go func() { go func() {
//0 启动worker工作池机制
s.msgHandler.StartWorkerPool()
//1 获取一个TCP的Addr //1 获取一个TCP的Addr
addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port)) addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port))
if err != nil { if err != nil {

Loading…
Cancel
Save