Browse Source

Merge pull request #111 from elvin-zheng/master

[pref]: change packet IO module
master
刘丹冰 3 years ago
committed by GitHub
parent
commit
a0ce5647b6
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      ziface/ipacket.go
  2. 1
      ziface/iserver.go
  3. 15
      znet/connection.go
  4. 56
      znet/connmanager.go
  5. 6
      znet/datapack.go
  6. 12
      znet/options.go
  7. 16
      znet/server.go

8
ziface/ipacket.go

@ -0,0 +1,8 @@
package ziface
type Packet interface {
Unpack(binaryData []byte) (IMessage, error)
Pack(msg IMessage) ([]byte, error)
GetHeadLen() uint32
}

1
ziface/iserver.go

@ -24,4 +24,5 @@ type IServer interface {
SetOnConnStop(func(IConnection)) //设置该Server的连接断开时的Hook函数
CallOnConnStart(conn IConnection) //调用连接OnConnStart Hook函数
CallOnConnStop(conn IConnection) //调用连接OnConnStop Hook函数
Packet() Packet
}

15
znet/connection.go

@ -39,8 +39,8 @@ type Connection struct {
isClosed bool
}
//NewConntion 创建连接的方法
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
//NewConnection 创建连接的方法
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
//初始化Conn属性
c := &Connection{
TCPServer: server,
@ -95,16 +95,15 @@ func (c *Connection) StartReader() {
defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
defer c.Stop()
// 创建拆包解包的对象
for {
select {
case <-c.ctx.Done():
return
default:
// 创建拆包解包的对象
dp := NewDataPack()
//读取客户端的Msg head
headData := make([]byte, dp.GetHeadLen())
headData := make([]byte, c.TCPServer.Packet().GetHeadLen())
if _, err := io.ReadFull(c.Conn, headData); err != nil {
fmt.Println("read msg head error ", err)
return
@ -112,7 +111,7 @@ func (c *Connection) StartReader() {
//fmt.Printf("read headData %+v\n", headData)
//拆包,得到msgID 和 datalen 放在msg中
msg, err := dp.Unpack(headData)
msg, err := c.TCPServer.Packet().Unpack(headData)
if err != nil {
fmt.Println("unpack error ", err)
return
@ -213,7 +212,7 @@ func (c *Connection) SendMsg(msgID uint32, data []byte) error {
}
//将data封包,并且发送
dp := NewDataPack()
dp := c.TCPServer.Packet()
msg, err := dp.Pack(NewMsgPackage(msgID, data))
if err != nil {
fmt.Println("Pack error msg ID = ", msgID)
@ -235,7 +234,7 @@ func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error {
}
//将data封包,并且发送
dp := NewDataPack()
dp := c.TCPServer.Packet()
msg, err := dp.Pack(NewMsgPackage(msgID, data))
if err != nil {
fmt.Println("Pack error msg ID = ", msgID)

56
znet/connmanager.go

@ -3,55 +3,49 @@ package znet
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"github.com/aceld/zinx/ziface"
)
//ConnManager 连接管理模块
type ConnManager struct {
connections map[uint32]ziface.IConnection //管理的连接信息
connLock sync.RWMutex //读写连接的读写锁
connections atomic.Value
}
//NewConnManager 创建一个链接管理
func NewConnManager() *ConnManager {
return &ConnManager{
connections: make(map[uint32]ziface.IConnection),
}
var cm = &ConnManager{}
connections := make(map[uint32]ziface.IConnection)
cm.connections.Store(connections)
return cm
}
//Add 添加链接
func (connMgr *ConnManager) Add(conn ziface.IConnection) {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
connections:=connMgr.connections.Load().(map[uint32]ziface.IConnection)
//将conn连接添加到ConnMananger中
connMgr.connections[conn.GetConnID()] = conn
connections[conn.GetConnID()] = conn
connMgr.connections.Store(connections)
fmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())
}
//Remove 删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
connections:=connMgr.connections.Load().(map[uint32]ziface.IConnection)
//删除连接信息
delete(connMgr.connections, conn.GetConnID())
delete(connections, conn.GetConnID())
connMgr.connections.Store(connections)
fmt.Println("connection Remove ConnID=", conn.GetConnID(), " successfully: conn num = ", connMgr.Len())
}
//Get 利用ConnID获取链接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {
//保护共享资源Map 加读锁
connMgr.connLock.RLock()
defer connMgr.connLock.RUnlock()
connections:=connMgr.connections.Load().(map[uint32]ziface.IConnection)
if conn, ok := connMgr.connections[connID]; ok {
if conn, ok := connections[connID]; ok {
return conn, nil
}
@ -61,37 +55,35 @@ func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {
//Len 获取当前连接
func (connMgr *ConnManager) Len() int {
return len(connMgr.connections)
connections:=connMgr.connections.Load().(map[uint32]ziface.IConnection)
return len(connections)
}
//ClearConn 清除并停止所有连接
func (connMgr *ConnManager) ClearConn() {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
connections:=connMgr.connections.Load().(map[uint32]ziface.IConnection)
//停止并删除全部的连接信息
for connID, conn := range connMgr.connections {
for connID, conn := range connections {
//停止
conn.Stop()
//删除
delete(connMgr.connections, connID)
delete(connections, connID)
}
connMgr.connections.Store(connections)
fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())
}
//ClearOneConn 利用ConnID获取一个链接 并且删除
func (connMgr *ConnManager) ClearOneConn(connID uint32) {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
connections:=connMgr.connections.Load().(map[uint32]ziface.IConnection)
if conn, ok := connMgr.connections[connID]; !ok {
if conn, ok := connections[connID]; !ok {
//停止
conn.Stop()
//删除
delete(connMgr.connections, connID)
delete(connections, connID)
connMgr.connections.Store(connections)
fmt.Println("Clear Connections ID: ", connID, "succeed")
return
}

6
znet/datapack.go

@ -9,18 +9,20 @@ import (
"github.com/aceld/zinx/ziface"
)
var defaultHeaderLen uint32 = 8
//DataPack 封包拆包类实例,暂时不需要成员
type DataPack struct{}
//NewDataPack 封包拆包实例初始化方法
func NewDataPack() *DataPack {
func NewDataPack() ziface.Packet {
return &DataPack{}
}
//GetHeadLen 获取包头长度方法
func (dp *DataPack) GetHeadLen() uint32 {
//ID uint32(4字节) + DataLen uint32(4字节)
return 8
return defaultHeaderLen
}
//Pack 封包方法(压缩数据)

12
znet/options.go

@ -0,0 +1,12 @@
package znet
import "github.com/aceld/zinx/ziface"
type Option func(s *Server)
// 只要实现Packet 接口可自由实现数据包解析格式,如果没有则使用默认解析格式
func WithPacket(pack ziface.Packet) Option {
return func(s *Server) {
s.packet = pack
}
}

16
znet/server.go

@ -39,10 +39,12 @@ type Server struct {
OnConnStart func(conn ziface.IConnection)
//该Server的连接断开时的Hook函数
OnConnStop func(conn ziface.IConnection)
packet ziface.Packet
}
//NewServer 创建一个服务器句柄
func NewServer() ziface.IServer {
func NewServer(opts ...Option) ziface.IServer {
printLogo()
s := &Server{
@ -52,7 +54,13 @@ func NewServer() ziface.IServer {
Port: utils.GlobalObject.TCPPort,
msgHandler: NewMsgHandle(),
ConnMgr: NewConnManager(),
packet: NewDataPack(),
}
for _, opt := range opts {
opt(s)
}
return s
}
@ -105,7 +113,7 @@ func (s *Server) Start() {
}
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
dealConn := NewConntion(s, conn, cID, s.msgHandler)
dealConn := NewConnection(s, conn, cID, s.msgHandler)
cID++
//3.4 启动当前链接的处理业务
@ -168,6 +176,10 @@ func (s *Server) CallOnConnStop(conn ziface.IConnection) {
}
}
func (s *Server) Packet() ziface.Packet {
return s.packet
}
func printLogo() {
fmt.Println(zinxLogo)
fmt.Println(topLine)

Loading…
Cancel
Save