Browse Source

添加链接管理模块

master
aceld 6 years ago
parent
commit
5836c612d0
  1. 2
      README.md
  2. 2
      utils/globalobj.go
  3. 4
      ziface/iconnection.go
  4. 10
      ziface/iserver.go
  5. 77
      znet/connection.go
  6. 49
      znet/server.go

2
README.md

@ -28,7 +28,7 @@ Zinx框架的项目制作采用编码和学习教程同步进行,将开发的
`mail`: `mail`:
[danbing.at@gmail.com](mailto:danbing.at@gmail.com) [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
`github`: `github`:
[https://github.com/aceld/zinx](https://github.com/aceld/zinx)
[https://github.com/aceld](https://github.com/aceld)
`原创书籍gitbook`: `原创书籍gitbook`:
[http://legacy.gitbook.com/@aceld](http://legacy.gitbook.com/@aceld) [http://legacy.gitbook.com/@aceld](http://legacy.gitbook.com/@aceld)

2
utils/globalobj.go

@ -28,6 +28,7 @@ type GlobalObj struct {
MaxConn int //当前服务器主机允许的最大链接个数 MaxConn int //当前服务器主机允许的最大链接个数
WorkerPoolSize uint32 //业务工作Worker池的数量 WorkerPoolSize uint32 //业务工作Worker池的数量
MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量 MaxWorkerTaskLen uint32 //业务工作Worker对应负责的任务队列最大任务存储数量
MaxMsgChanLen uint32 //SendBuffMsg发送消息的缓冲最大长度
/* /*
config file path config file path
@ -87,6 +88,7 @@ func init() {
ConfFilePath: "conf/zinx.json", ConfFilePath: "conf/zinx.json",
WorkerPoolSize: 10, WorkerPoolSize: 10,
MaxWorkerTaskLen: 1024, MaxWorkerTaskLen: 1024,
MaxMsgChanLen:1024,
} }
//从配置文件中加载一些用户配置的参数 //从配置文件中加载一些用户配置的参数

4
ziface/iconnection.go

@ -14,8 +14,10 @@ type IConnection interface {
GetConnID() uint32 GetConnID() uint32
//获取远程客户端地址信息 //获取远程客户端地址信息
RemoteAddr() net.Addr RemoteAddr() net.Addr
//直接将Message数据发送数据给远程的TCP客户端
//直接将Message数据发送数据给远程的TCP客户端(无缓冲)
SendMsg(msgId uint32, data []byte) error SendMsg(msgId uint32, data []byte) error
//直接将Message数据发送给远程的TCP客户端(有缓冲)
SendBuffMsg(msgId uint32, data []byte) error
} }

10
ziface/iserver.go

@ -10,4 +10,14 @@ type IServer interface{
Serve() Serve()
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用 //路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
AddRouter(msgId uint32, router IRouter) AddRouter(msgId uint32, router IRouter)
//得到链接管理
GetConnMgr() IConnManager
//设置该Server的连接创建时Hook函数
SetOnConnStart(func (IConnection))
//设置该Server的连接断开时的Hook函数
SetOnConnStop(func (IConnection))
//调用连接OnConnStart Hook函数
CallOnConnStart(conn IConnection)
//调用连接OnConnStop Hook函数
CallOnConnStop(conn IConnection)
} }

77
znet/connection.go

@ -10,6 +10,8 @@ import (
) )
type Connection struct { type Connection struct {
//当前Conn属于哪个Server
TcpServer ziface.IServer
//当前连接的socket TCP套接字 //当前连接的socket TCP套接字
Conn *net.TCPConn Conn *net.TCPConn
//当前连接的ID 也可以称作为SessionID,ID全局唯一 //当前连接的ID 也可以称作为SessionID,ID全局唯一
@ -22,21 +24,27 @@ type Connection struct {
ExitBuffChan chan bool ExitBuffChan chan bool
//无缓冲管道,用于读、写两个goroutine之间的消息通信 //无缓冲管道,用于读、写两个goroutine之间的消息通信
msgChan chan []byte msgChan chan []byte
//有关冲管道,用于读、写两个goroutine之间的消息通信
msgBuffChan chan []byte
} }
//创建连接的方法 //创建连接的方法
func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{
//初始化Conn属性
c := &Connection{ c := &Connection{
TcpServer:server,
Conn: conn, Conn: conn,
ConnID: connID, ConnID: connID,
isClosed: false, isClosed: false,
MsgHandler: msgHandler, MsgHandler: msgHandler,
ExitBuffChan: make(chan bool, 1), ExitBuffChan: make(chan bool, 1),
msgChan:make(chan []byte), msgChan:make(chan []byte),
// SendBuffChan: make(chan []byte, 512),
msgBuffChan:make(chan []byte, utils.GlobalObject.MaxMsgChanLen),
} }
//将新创建的Conn添加到链接管理中
c.TcpServer.GetConnMgr().Add(c)
return c return c
} }
@ -44,9 +52,8 @@ func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle)
写消息Goroutine 用户将数据发送给客户端 写消息Goroutine 用户将数据发送给客户端
*/ */
func (c *Connection) StartWriter() { func (c *Connection) StartWriter() {
defer fmt.Println(c.RemoteAddr().String(), " conn Writer exit!")
defer c.Stop()
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
for { for {
select { select {
@ -56,6 +63,16 @@ func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle)
fmt.Println("Send Data error:, ", err, " Conn Writer exit") fmt.Println("Send Data error:, ", err, " Conn Writer exit")
return return
} }
case data, ok:= <-c.msgBuffChan:
if ok {
//有数据要写给客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
return
}
} else {
fmt.Println("msgBuffChan is Closed")
}
case <- c.ExitBuffChan: case <- c.ExitBuffChan:
//conn已经关闭 //conn已经关闭
return return
@ -68,9 +85,8 @@ func NewConntion(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle)
读消息Goroutine用于从客户端中读取数据 读消息Goroutine用于从客户端中读取数据
*/ */
func (c *Connection) StartReader() { func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
fmt.Println("[Reader Goroutine is running]")
defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
for { for {
// 创建拆包解包的对象 // 创建拆包解包的对象
@ -80,16 +96,14 @@ func (c *Connection) StartReader() {
headData := make([]byte, dp.GetHeadLen()) headData := make([]byte, dp.GetHeadLen())
if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil { if _, err := io.ReadFull(c.GetTCPConnection(), headData); err != nil {
fmt.Println("read msg head error ", err) fmt.Println("read msg head error ", err)
c.ExitBuffChan <- true
continue
break
} }
//拆包,得到msgid 和 datalen 放在msg中 //拆包,得到msgid 和 datalen 放在msg中
msg , err := dp.Unpack(headData) msg , err := dp.Unpack(headData)
if err != nil { if err != nil {
fmt.Println("unpack error ", err) fmt.Println("unpack error ", err)
c.ExitBuffChan <- true
continue
break
} }
//根据 dataLen 读取 data,放在msg.Data中 //根据 dataLen 读取 data,放在msg.Data中
@ -98,8 +112,7 @@ func (c *Connection) StartReader() {
data = make([]byte, msg.GetDataLen()) data = make([]byte, msg.GetDataLen())
if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil { if _, err := io.ReadFull(c.GetTCPConnection(), data); err != nil {
fmt.Println("read msg data error ", err) fmt.Println("read msg data error ", err)
c.ExitBuffChan <- true
continue
break
} }
} }
msg.SetData(data) msg.SetData(data)
@ -118,20 +131,28 @@ func (c *Connection) StartReader() {
go c.MsgHandler.DoMsgHandler(&req) go c.MsgHandler.DoMsgHandler(&req)
} }
} }
c.ExitBuffChan <- true
} }
//启动连接,让当前连接开始工作 //启动连接,让当前连接开始工作
func (c *Connection) Start() { func (c *Connection) Start() {
//Start()函数结束的时候就应该调用Stop处理善后业务
defer c.Stop()
//1 开启用户从客户端读取数据流程的Goroutine //1 开启用户从客户端读取数据流程的Goroutine
go c.StartReader() go c.StartReader()
//2 开启用于写回客户端数据流程的Goroutine //2 开启用于写回客户端数据流程的Goroutine
go c.StartWriter() go c.StartWriter()
//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
c.TcpServer.CallOnConnStart(c)
for { for {
select { select {
case <- c.ExitBuffChan: case <- c.ExitBuffChan:
//得到退出消息,不再阻塞 //得到退出消息,不再阻塞
fmt.Println("Start recv ExitBuffChan...")
return return
} }
} }
@ -139,23 +160,25 @@ func (c *Connection) Start() {
//停止连接,结束当前连接状态M //停止连接,结束当前连接状态M
func (c *Connection) Stop() { func (c *Connection) Stop() {
fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
//1. 如果当前链接已经关闭 //1. 如果当前链接已经关闭
if c.isClosed == true { if c.isClosed == true {
return return
} }
c.isClosed = true c.isClosed = true
//TODO Connection Stop() 如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
c.TcpServer.CallOnConnStop(c)
// 关闭socket链接 // 关闭socket链接
c.Conn.Close() c.Conn.Close()
//通知从缓冲队列读数据的业务,该链接已经关闭
c.ExitBuffChan <- true
//将链接从连接管理器中删除
c.TcpServer.GetConnMgr().Remove(c)
//关闭该链接全部管道 //关闭该链接全部管道
close(c.ExitBuffChan) close(c.ExitBuffChan)
//close(c.SendBuffChan)
close(c.msgBuffChan)
} }
//从当前连接获取原始的socket TCPConn //从当前连接获取原始的socket TCPConn
@ -191,3 +214,21 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error {
return nil return nil
} }
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("Connection closed when send buff msg")
}
//将data封包,并且发送
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgId, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
return errors.New("Pack error msg ")
}
//写回客户端
c.msgBuffChan <- msg
return nil
}

49
znet/server.go

@ -19,6 +19,12 @@ type Server struct {
Port int Port int
//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法 //当前Server的消息管理模块,用来绑定MsgId和对应的处理方法
msgHandler ziface.IMsgHandle msgHandler ziface.IMsgHandle
//当前Server的链接管理器
ConnMgr ziface.IConnManager
//该Server的连接创建时Hook函数
OnConnStart func(conn ziface.IConnection)
//该Server的连接断开时的Hook函数
OnConnStop func(conn ziface.IConnection)
} }
/* /*
@ -33,6 +39,7 @@ func NewServer () ziface.IServer {
IP:utils.GlobalObject.Host, IP:utils.GlobalObject.Host,
Port:utils.GlobalObject.TcpPort, Port:utils.GlobalObject.TcpPort,
msgHandler: NewMsgHandle(), msgHandler: NewMsgHandle(),
ConnMgr:NewConnManager(),
} }
return s return s
} }
@ -81,10 +88,14 @@ func (s *Server) Start() {
continue continue
} }
//3.2 TODO Server.Start() 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn {
conn.Close()
continue
}
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的 //3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
dealConn := NewConntion(conn, cid, s.msgHandler)
dealConn := NewConntion(s, conn, cid, s.msgHandler)
cid ++ cid ++
//3.4 启动当前链接的处理业务 //3.4 启动当前链接的处理业务
@ -113,4 +124,38 @@ func (s *Server)AddRouter(msgId uint32, router ziface.IRouter) {
s.msgHandler.AddRouter(msgId, router) s.msgHandler.AddRouter(msgId, router)
} }
//得到链接管理
func (s *Server) GetConnMgr() ziface.IConnManager {
return s.ConnMgr
}
//设置该Server的连接创建时Hook函数
func (s *Server) SetOnConnStart(hookFunc func (ziface.IConnection)) {
s.OnConnStart = hookFunc
}
//设置该Server的连接断开时的Hook函数
func (s *Server) SetOnConnStop(hookFunc func (ziface.IConnection)) {
s.OnConnStop = hookFunc
}
//调用连接OnConnStart Hook函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {
if s.OnConnStart != nil {
s.OnConnStart(conn)
}
}
//调用连接OnConnStop Hook函数
func (s *Server) CallOnConnStop(conn ziface.IConnection) {
if s.OnConnStop != nil {
s.OnConnStop(conn)
}
}
//捕捉信号处理函数,对服务器退出平滑处理
func (s *Server) WaitSignal() {
}
Loading…
Cancel
Save