|
@ -1,13 +1,15 @@ |
|
|
package znet |
|
|
package znet |
|
|
|
|
|
|
|
|
import ( |
|
|
import ( |
|
|
|
|
|
"context" |
|
|
"errors" |
|
|
"errors" |
|
|
"fmt" |
|
|
"fmt" |
|
|
"github.com/aceld/zinx/utils" |
|
|
|
|
|
"github.com/aceld/zinx/ziface" |
|
|
|
|
|
"io" |
|
|
"io" |
|
|
"net" |
|
|
"net" |
|
|
"sync" |
|
|
"sync" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/aceld/zinx/utils" |
|
|
|
|
|
"github.com/aceld/zinx/ziface" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
type Connection struct { |
|
|
type Connection struct { |
|
@ -17,36 +19,35 @@ type Connection struct { |
|
|
Conn *net.TCPConn |
|
|
Conn *net.TCPConn |
|
|
//当前连接的ID 也可以称作为SessionID,ID全局唯一
|
|
|
//当前连接的ID 也可以称作为SessionID,ID全局唯一
|
|
|
ConnID uint32 |
|
|
ConnID uint32 |
|
|
//当前连接的关闭状态
|
|
|
|
|
|
isClosed bool |
|
|
|
|
|
//消息管理MsgId和对应处理方法的消息管理模块
|
|
|
//消息管理MsgId和对应处理方法的消息管理模块
|
|
|
MsgHandler ziface.IMsgHandle |
|
|
MsgHandler ziface.IMsgHandle |
|
|
//告知该链接已经退出/停止的channel
|
|
|
//告知该链接已经退出/停止的channel
|
|
|
ExitBuffChan chan bool |
|
|
|
|
|
|
|
|
ctx context.Context |
|
|
|
|
|
cancel context.CancelFunc |
|
|
//无缓冲管道,用于读、写两个goroutine之间的消息通信
|
|
|
//无缓冲管道,用于读、写两个goroutine之间的消息通信
|
|
|
msgChan chan []byte |
|
|
msgChan chan []byte |
|
|
//有缓冲管道,用于读、写两个goroutine之间的消息通信
|
|
|
//有缓冲管道,用于读、写两个goroutine之间的消息通信
|
|
|
msgBuffChan chan []byte |
|
|
msgBuffChan chan []byte |
|
|
|
|
|
|
|
|
|
|
|
sync.RWMutex |
|
|
//链接属性
|
|
|
//链接属性
|
|
|
property map[string]interface{} |
|
|
property map[string]interface{} |
|
|
//保护链接属性修改的锁
|
|
|
|
|
|
propertyLock sync.RWMutex |
|
|
|
|
|
|
|
|
//当前连接的关闭状态
|
|
|
|
|
|
isClosed bool |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
//创建连接的方法
|
|
|
//创建连接的方法
|
|
|
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection { |
|
|
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection { |
|
|
//初始化Conn属性
|
|
|
//初始化Conn属性
|
|
|
c := &Connection{ |
|
|
c := &Connection{ |
|
|
TcpServer: server, |
|
|
|
|
|
Conn: conn, |
|
|
|
|
|
ConnID: connID, |
|
|
|
|
|
isClosed: false, |
|
|
|
|
|
MsgHandler: msgHandler, |
|
|
|
|
|
ExitBuffChan: make(chan bool, 1), |
|
|
|
|
|
msgChan: make(chan []byte), |
|
|
|
|
|
msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen), |
|
|
|
|
|
property: make(map[string]interface{}), |
|
|
|
|
|
|
|
|
TcpServer: server, |
|
|
|
|
|
Conn: conn, |
|
|
|
|
|
ConnID: connID, |
|
|
|
|
|
isClosed: false, |
|
|
|
|
|
MsgHandler: msgHandler, |
|
|
|
|
|
msgChan: make(chan []byte), |
|
|
|
|
|
msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen), |
|
|
|
|
|
property: make(map[string]interface{}), |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
//将新创建的Conn添加到链接管理中
|
|
|
//将新创建的Conn添加到链接管理中
|
|
@ -81,7 +82,7 @@ func (c *Connection) StartWriter() { |
|
|
fmt.Println("msgBuffChan is Closed") |
|
|
fmt.Println("msgBuffChan is Closed") |
|
|
break |
|
|
break |
|
|
} |
|
|
} |
|
|
case <-c.ExitBuffChan: |
|
|
|
|
|
|
|
|
case <-c.ctx.Done(): |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
@ -96,53 +97,59 @@ func (c *Connection) StartReader() { |
|
|
defer c.Stop() |
|
|
defer c.Stop() |
|
|
|
|
|
|
|
|
for { |
|
|
for { |
|
|
// 创建拆包解包的对象
|
|
|
|
|
|
dp := NewDataPack() |
|
|
|
|
|
|
|
|
|
|
|
//读取客户端的Msg head
|
|
|
|
|
|
headData := make([]byte, dp.GetHeadLen()) |
|
|
|
|
|
if _, err := io.ReadFull(c.Conn, headData); err != nil { |
|
|
|
|
|
fmt.Println("read msg head error ", err) |
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
|
|
|
//fmt.Printf("read headData %+v\n", headData)
|
|
|
|
|
|
|
|
|
|
|
|
//拆包,得到msgid 和 datalen 放在msg中
|
|
|
|
|
|
msg, err := dp.Unpack(headData) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
fmt.Println("unpack error ", err) |
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
select { |
|
|
|
|
|
case <-c.ctx.Done(): |
|
|
|
|
|
return |
|
|
|
|
|
default: |
|
|
|
|
|
// 创建拆包解包的对象
|
|
|
|
|
|
dp := NewDataPack() |
|
|
|
|
|
|
|
|
|
|
|
//读取客户端的Msg head
|
|
|
|
|
|
headData := make([]byte, dp.GetHeadLen()) |
|
|
|
|
|
if _, err := io.ReadFull(c.Conn, headData); err != nil { |
|
|
|
|
|
fmt.Println("read msg head error ", err) |
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
|
|
|
//fmt.Printf("read headData %+v\n", headData)
|
|
|
|
|
|
|
|
|
//根据 dataLen 读取 data,放在msg.Data中
|
|
|
|
|
|
var data []byte |
|
|
|
|
|
if msg.GetDataLen() > 0 { |
|
|
|
|
|
data = make([]byte, msg.GetDataLen()) |
|
|
|
|
|
if _, err := io.ReadFull(c.Conn, data); err != nil { |
|
|
|
|
|
fmt.Println("read msg data error ", err) |
|
|
|
|
|
|
|
|
//拆包,得到msgid 和 datalen 放在msg中
|
|
|
|
|
|
msg, err := dp.Unpack(headData) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
fmt.Println("unpack error ", err) |
|
|
break |
|
|
break |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
msg.SetData(data) |
|
|
|
|
|
|
|
|
|
|
|
//得到当前客户端请求的Request数据
|
|
|
|
|
|
req := Request{ |
|
|
|
|
|
conn: c, |
|
|
|
|
|
msg: msg, |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
//根据 dataLen 读取 data,放在msg.Data中
|
|
|
|
|
|
var data []byte |
|
|
|
|
|
if msg.GetDataLen() > 0 { |
|
|
|
|
|
data = make([]byte, msg.GetDataLen()) |
|
|
|
|
|
if _, err := io.ReadFull(c.Conn, data); err != nil { |
|
|
|
|
|
fmt.Println("read msg data error ", err) |
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
msg.SetData(data) |
|
|
|
|
|
|
|
|
if utils.GlobalObject.WorkerPoolSize > 0 { |
|
|
|
|
|
//已经启动工作池机制,将消息交给Worker处理
|
|
|
|
|
|
c.MsgHandler.SendMsgToTaskQueue(&req) |
|
|
|
|
|
} else { |
|
|
|
|
|
//从绑定好的消息和对应的处理方法中执行对应的Handle方法
|
|
|
|
|
|
go c.MsgHandler.DoMsgHandler(&req) |
|
|
|
|
|
|
|
|
//得到当前客户端请求的Request数据
|
|
|
|
|
|
req := Request{ |
|
|
|
|
|
conn: c, |
|
|
|
|
|
msg: msg, |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if utils.GlobalObject.WorkerPoolSize > 0 { |
|
|
|
|
|
//已经启动工作池机制,将消息交给Worker处理
|
|
|
|
|
|
c.MsgHandler.SendMsgToTaskQueue(&req) |
|
|
|
|
|
} else { |
|
|
|
|
|
//从绑定好的消息和对应的处理方法中执行对应的Handle方法
|
|
|
|
|
|
go c.MsgHandler.DoMsgHandler(&req) |
|
|
|
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
//启动连接,让当前连接开始工作
|
|
|
//启动连接,让当前连接开始工作
|
|
|
func (c *Connection) Start() { |
|
|
func (c *Connection) Start() { |
|
|
|
|
|
c.ctx, c.cancel = context.WithCancel(context.Background()) |
|
|
//1 开启用户从客户端读取数据流程的Goroutine
|
|
|
//1 开启用户从客户端读取数据流程的Goroutine
|
|
|
go c.StartReader() |
|
|
go c.StartReader() |
|
|
//2 开启用于写回客户端数据流程的Goroutine
|
|
|
//2 开启用于写回客户端数据流程的Goroutine
|
|
@ -155,6 +162,9 @@ func (c *Connection) Start() { |
|
|
func (c *Connection) Stop() { |
|
|
func (c *Connection) Stop() { |
|
|
fmt.Println("Conn Stop()...ConnID = ", c.ConnID) |
|
|
fmt.Println("Conn Stop()...ConnID = ", c.ConnID) |
|
|
//如果当前链接已经关闭
|
|
|
//如果当前链接已经关闭
|
|
|
|
|
|
c.Lock() |
|
|
|
|
|
defer c.Unlock() |
|
|
|
|
|
|
|
|
if c.isClosed == true { |
|
|
if c.isClosed == true { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
@ -166,13 +176,12 @@ func (c *Connection) Stop() { |
|
|
// 关闭socket链接
|
|
|
// 关闭socket链接
|
|
|
c.Conn.Close() |
|
|
c.Conn.Close() |
|
|
//关闭Writer
|
|
|
//关闭Writer
|
|
|
c.ExitBuffChan <- true |
|
|
|
|
|
|
|
|
c.cancel() |
|
|
|
|
|
|
|
|
//将链接从连接管理器中删除
|
|
|
//将链接从连接管理器中删除
|
|
|
c.TcpServer.GetConnMgr().Remove(c) |
|
|
c.TcpServer.GetConnMgr().Remove(c) |
|
|
|
|
|
|
|
|
//关闭该链接全部管道
|
|
|
//关闭该链接全部管道
|
|
|
close(c.ExitBuffChan) |
|
|
|
|
|
close(c.msgBuffChan) |
|
|
close(c.msgBuffChan) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
@ -193,9 +202,13 @@ func (c *Connection) RemoteAddr() net.Addr { |
|
|
|
|
|
|
|
|
//直接将Message数据发送数据给远程的TCP客户端
|
|
|
//直接将Message数据发送数据给远程的TCP客户端
|
|
|
func (c *Connection) SendMsg(msgId uint32, data []byte) error { |
|
|
func (c *Connection) SendMsg(msgId uint32, data []byte) error { |
|
|
|
|
|
c.RLock() |
|
|
if c.isClosed == true { |
|
|
if c.isClosed == true { |
|
|
|
|
|
c.RUnlock() |
|
|
return errors.New("connection closed when send msg") |
|
|
return errors.New("connection closed when send msg") |
|
|
} |
|
|
} |
|
|
|
|
|
c.RUnlock() |
|
|
|
|
|
|
|
|
//将data封包,并且发送
|
|
|
//将data封包,并且发送
|
|
|
dp := NewDataPack() |
|
|
dp := NewDataPack() |
|
|
msg, err := dp.Pack(NewMsgPackage(msgId, data)) |
|
|
msg, err := dp.Pack(NewMsgPackage(msgId, data)) |
|
@ -211,9 +224,13 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error { |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error { |
|
|
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error { |
|
|
|
|
|
c.RLock() |
|
|
if c.isClosed == true { |
|
|
if c.isClosed == true { |
|
|
|
|
|
c.RUnlock() |
|
|
return errors.New("Connection closed when send buff msg") |
|
|
return errors.New("Connection closed when send buff msg") |
|
|
} |
|
|
} |
|
|
|
|
|
c.RUnlock() |
|
|
|
|
|
|
|
|
//将data封包,并且发送
|
|
|
//将data封包,并且发送
|
|
|
dp := NewDataPack() |
|
|
dp := NewDataPack() |
|
|
msg, err := dp.Pack(NewMsgPackage(msgId, data)) |
|
|
msg, err := dp.Pack(NewMsgPackage(msgId, data)) |
|
@ -230,16 +247,16 @@ func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error { |
|
|
|
|
|
|
|
|
//设置链接属性
|
|
|
//设置链接属性
|
|
|
func (c *Connection) SetProperty(key string, value interface{}) { |
|
|
func (c *Connection) SetProperty(key string, value interface{}) { |
|
|
c.propertyLock.Lock() |
|
|
|
|
|
defer c.propertyLock.Unlock() |
|
|
|
|
|
|
|
|
c.Lock() |
|
|
|
|
|
defer c.Unlock() |
|
|
|
|
|
|
|
|
c.property[key] = value |
|
|
c.property[key] = value |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
//获取链接属性
|
|
|
//获取链接属性
|
|
|
func (c *Connection) GetProperty(key string) (interface{}, error) { |
|
|
func (c *Connection) GetProperty(key string) (interface{}, error) { |
|
|
c.propertyLock.RLock() |
|
|
|
|
|
defer c.propertyLock.RUnlock() |
|
|
|
|
|
|
|
|
c.RLock() |
|
|
|
|
|
defer c.RUnlock() |
|
|
|
|
|
|
|
|
if value, ok := c.property[key]; ok { |
|
|
if value, ok := c.property[key]; ok { |
|
|
return value, nil |
|
|
return value, nil |
|
@ -250,8 +267,8 @@ func (c *Connection) GetProperty(key string) (interface{}, error) { |
|
|
|
|
|
|
|
|
//移除链接属性
|
|
|
//移除链接属性
|
|
|
func (c *Connection) RemoveProperty(key string) { |
|
|
func (c *Connection) RemoveProperty(key string) { |
|
|
c.propertyLock.Lock() |
|
|
|
|
|
defer c.propertyLock.Unlock() |
|
|
|
|
|
|
|
|
c.Lock() |
|
|
|
|
|
defer c.Unlock() |
|
|
|
|
|
|
|
|
delete(c.property, key) |
|
|
delete(c.property, key) |
|
|
} |
|
|
} |