From 09ab50231ae13be8c539f37b2e8a375dacc1e601 Mon Sep 17 00:00:00 2001 From: aceld Date: Mon, 18 Feb 2019 15:08:48 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E9=93=BE=E6=8E=A5=E5=B1=9E?= =?UTF-8?q?=E6=80=A7=E9=85=8D=E7=BD=AE=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ziface/iconnection.go | 10 ++++ znet/connection.go | 123 ++++++++++++++++++++++++++---------------- 2 files changed, 88 insertions(+), 45 deletions(-) diff --git a/ziface/iconnection.go b/ziface/iconnection.go index ae644d4..cff393f 100644 --- a/ziface/iconnection.go +++ b/ziface/iconnection.go @@ -8,16 +8,26 @@ type IConnection interface { Start() //停止连接,结束当前连接状态M Stop() + //从当前连接获取原始的socket TCPConn GetTCPConnection() *net.TCPConn //获取当前连接ID GetConnID() uint32 //获取远程客户端地址信息 RemoteAddr() net.Addr + //直接将Message数据发送数据给远程的TCP客户端(无缓冲) SendMsg(msgId uint32, data []byte) error //直接将Message数据发送给远程的TCP客户端(有缓冲) SendBuffMsg(msgId uint32, data []byte) error + + //设置链接属性 + SetProperty(key string, value interface{}) + //获取链接属性 + GetProperty(key string)(interface{}, error) + //移除链接属性 + RemoveProperty(key string) + } diff --git a/znet/connection.go b/znet/connection.go index 50efb10..b89d88b 100644 --- a/znet/connection.go +++ b/znet/connection.go @@ -5,13 +5,14 @@ import ( "fmt" "io" "net" + "sync" "zinx/utils" "zinx/ziface" ) type Connection struct { //当前Conn属于哪个Server - TcpServer ziface.IServer + TcpServer ziface.IServer //当前连接的socket TCP套接字 Conn *net.TCPConn //当前连接的ID 也可以称作为SessionID,ID全局唯一 @@ -23,24 +24,29 @@ type Connection struct { //告知该链接已经退出/停止的channel ExitBuffChan chan bool //无缓冲管道,用于读、写两个goroutine之间的消息通信 - msgChan chan []byte + msgChan chan []byte //有关冲管道,用于读、写两个goroutine之间的消息通信 msgBuffChan chan []byte -} + //链接属性 + property map[string]interface{} + //保护链接属性修改的锁 + propertyLock sync.RWMutex +} //创建连接的方法 -func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{ +func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection { //初始化Conn属性 c := &Connection{ - TcpServer:server, - Conn: conn, - ConnID: connID, - isClosed: false, - MsgHandler: msgHandler, + TcpServer: server, + Conn: conn, + ConnID: connID, + isClosed: false, + MsgHandler: msgHandler, ExitBuffChan: make(chan bool, 1), - msgChan:make(chan []byte), - msgBuffChan:make(chan []byte, utils.GlobalObject.MaxMsgChanLen), + msgChan: make(chan []byte), + msgBuffChan: make(chan []byte, utils.GlobalObject.MaxMsgChanLen), + property: make(map[string]interface{}), } //将新创建的Conn添加到链接管理中 @@ -50,40 +56,39 @@ func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHan /* 写消息Goroutine, 用户将数据发送给客户端 - */ - func (c *Connection) StartWriter() { - fmt.Println("[Writer Goroutine is running]") - defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]") - - for { - select { - case data := <-c.msgChan: - //有数据要写给客户端 - if _, err := c.Conn.Write(data); err != nil { - fmt.Println("Send Data error:, ", err, " Conn Writer exit") - return - } - case data, ok:= <-c.msgBuffChan: - if ok { - //有数据要写给客户端 - if _, err := c.Conn.Write(data); err != nil { - fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit") - return - } - } else { - break - fmt.Println("msgBuffChan is Closed") +*/ +func (c *Connection) StartWriter() { + fmt.Println("[Writer Goroutine is running]") + defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]") + + for { + select { + case data := <-c.msgChan: + //有数据要写给客户端 + if _, err := c.Conn.Write(data); err != nil { + fmt.Println("Send Data error:, ", err, " Conn Writer exit") + return + } + case data, ok := <-c.msgBuffChan: + if ok { + //有数据要写给客户端 + if _, err := c.Conn.Write(data); err != nil { + fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit") + return } - case <-c.ExitBuffChan: - return + } else { + break + fmt.Println("msgBuffChan is Closed") + } + case <-c.ExitBuffChan: + return } } - } - +} /* 读消息Goroutine,用于从客户端中读取数据 - */ +*/ func (c *Connection) StartReader() { fmt.Println("[Reader Goroutine is running]") defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]") @@ -101,7 +106,7 @@ func (c *Connection) StartReader() { } //拆包,得到msgid 和 datalen 放在msg中 - msg , err := dp.Unpack(headData) + msg, err := dp.Unpack(headData) if err != nil { fmt.Println("unpack error ", err) break @@ -120,8 +125,8 @@ func (c *Connection) StartReader() { //得到当前客户端请求的Request数据 req := Request{ - conn:c, - msg:msg, + conn: c, + msg: msg, } if utils.GlobalObject.WorkerPoolSize > 0 { @@ -175,7 +180,7 @@ func (c *Connection) GetTCPConnection() *net.TCPConn { } //获取当前连接ID -func (c *Connection) GetConnID() uint32{ +func (c *Connection) GetConnID() uint32 { return c.ConnID } @@ -194,7 +199,7 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error { msg, err := dp.Pack(NewMsgPackage(msgId, data)) if err != nil { fmt.Println("Pack error msg id = ", msgId) - return errors.New("Pack error msg ") + return errors.New("Pack error msg ") } //写回客户端 @@ -212,7 +217,7 @@ func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error { msg, err := dp.Pack(NewMsgPackage(msgId, data)) if err != nil { fmt.Println("Pack error msg id = ", msgId) - return errors.New("Pack error msg ") + return errors.New("Pack error msg ") } //写回客户端 @@ -220,3 +225,31 @@ func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error { return nil } + +//设置链接属性 +func (c *Connection) SetProperty(key string, value interface{}) { + c.propertyLock.Lock() + defer c.propertyLock.Unlock() + + c.property[key] = value +} + +//获取链接属性 +func (c *Connection) GetProperty(key string) (interface{}, error) { + c.propertyLock.RLock() + defer c.propertyLock.RUnlock() + + if value, ok := c.property[key]; ok { + return value, nil + } else { + return nil, errors.New("no property found") + } +} + +//移除链接属性 +func (c *Connection) RemoveProperty(key string) { + c.propertyLock.Lock() + defer c.propertyLock.Unlock() + + delete(c.property, key) +}