Browse Source

代码正规化

master
kstwoak47 4 years ago
parent
commit
caede13943
  1. 4
      ziface/iconnection.go
  2. 4
      ziface/imessage.go
  3. 2
      ziface/imsghandler.go
  4. 2
      ziface/iserver.go
  5. 13
      zinx_app_demo/mmo_game/api/move.go
  6. 11
      zinx_app_demo/mmo_game/api/world_chat.go
  7. 45
      zinx_app_demo/mmo_game/client_AI_robot.go
  8. 92
      zinx_app_demo/mmo_game/core/aoi.go
  9. 16
      zinx_app_demo/mmo_game/core/aoi_test.go
  10. 16
      zinx_app_demo/mmo_game/core/grid.go
  11. 161
      zinx_app_demo/mmo_game/core/player.go
  12. 28
      zinx_app_demo/mmo_game/core/world_manager.go
  13. 54
      zinx_app_demo/mmo_game/pb/msg.pb.go
  14. 8
      zinx_app_demo/mmo_game/pb/msg.proto
  15. 21
      zinx_app_demo/mmo_game/server.go
  16. 21
      zlog/stdzlog.go
  17. 10
      zlog/zlogger.go
  18. 66
      znet/connection.go
  19. 42
      znet/connmanager.go
  20. 17
      znet/datapack.go
  21. 6
      znet/datapack_test.go
  22. 29
      znet/message.go
  23. 29
      znet/msghandler.go
  24. 9
      znet/request.go
  25. 12
      znet/router.go
  26. 42
      znet/server.go
  27. 9
      znet/server_test.go
  28. 46
      ztimer/timerscheduler.go
  29. 12
      ztimer/timerscheduler_test.go
  30. 40
      ztimer/timewheel.go

4
ziface/iconnection.go

@ -17,9 +17,9 @@ type IConnection interface {
RemoteAddr() net.Addr
//直接将Message数据发送数据给远程的TCP客户端(无缓冲)
SendMsg(msgId uint32, data []byte) error
SendMsg(msgID uint32, data []byte) error
//直接将Message数据发送给远程的TCP客户端(有缓冲)
SendBuffMsg(msgId uint32, data []byte) error
SendBuffMsg(msgID uint32, data []byte) error
//设置链接属性
SetProperty(key string, value interface{})

4
ziface/imessage.go

@ -5,10 +5,10 @@ package ziface
*/
type IMessage interface {
GetDataLen() uint32 //获取消息数据段长度
GetMsgId() uint32 //获取消息ID
GetMsgID() uint32 //获取消息ID
GetData() []byte //获取消息内容
SetMsgId(uint32) //设计消息ID
SetMsgID(uint32) //设计消息ID
SetData([]byte) //设计消息内容
SetDataLen(uint32) //设置消息数据段长度
}

2
ziface/imsghandler.go

@ -5,7 +5,7 @@ package ziface
*/
type IMsgHandle interface {
DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息
AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑
AddRouter(msgID uint32, router IRouter) //为消息添加具体的处理逻辑
StartWorkerPool() //启动worker工作池
SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理
}

2
ziface/iserver.go

@ -9,7 +9,7 @@ type IServer interface {
//开启业务服务方法
Serve()
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
AddRouter(msgId uint32, router IRouter)
AddRouter(msgID uint32, router IRouter)
//得到链接管理
GetConnMgr() IConnManager
//设置该Server的连接创建时Hook函数

13
zinx_app_demo/mmo_game/api/move.go

@ -2,6 +2,7 @@ package api
import (
"fmt"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zinx_app_demo/mmo_game/core"
"github.com/aceld/zinx/zinx_app_demo/mmo_game/pb"
@ -23,18 +24,18 @@ func (*MoveApi) Handle(request ziface.IRequest) {
return
}
//2. 得知当前的消息是从哪个玩家传递来的,从连接属性pid中获取
pid, err := request.GetConnection().GetProperty("pid")
//2. 得知当前的消息是从哪个玩家传递来的,从连接属性pID中获取
pID, err := request.GetConnection().GetProperty("pID")
if err != nil {
fmt.Println("GetProperty pid error", err)
fmt.Println("GetProperty pID error", err)
request.GetConnection().Stop()
return
}
//fmt.Printf("user pid = %d , move(%f,%f,%f,%f)\n", pid, msg.X, msg.Y, msg.Z, msg.V)
//fmt.Printf("user pID = %d , move(%f,%f,%f,%f)\n", pID, msg.X, msg.Y, msg.Z, msg.V)
//3. 根据pid得到player对象
player := core.WorldMgrObj.GetPlayerByPid(pid.(int32))
//3. 根据pID得到player对象
player := core.WorldMgrObj.GetPlayerByPID(pID.(int32))
//4. 让player对象发起移动位置信息广播
player.UpdatePos(msg.X, msg.Y, msg.Z, msg.V)

11
zinx_app_demo/mmo_game/api/world_chat.go

@ -2,6 +2,7 @@ package api
import (
"fmt"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zinx_app_demo/mmo_game/core"
"github.com/aceld/zinx/zinx_app_demo/mmo_game/pb"
@ -23,15 +24,15 @@ func (*WorldChatApi) Handle(request ziface.IRequest) {
return
}
//2. 得知当前的消息是从哪个玩家传递来的,从连接属性pid中获取
pid, err := request.GetConnection().GetProperty("pid")
//2. 得知当前的消息是从哪个玩家传递来的,从连接属性pID中获取
pID, err := request.GetConnection().GetProperty("pID")
if err != nil {
fmt.Println("GetProperty pid error", err)
fmt.Println("GetProperty pID error", err)
request.GetConnection().Stop()
return
}
//3. 根据pid得到player对象
player := core.WorldMgrObj.GetPlayerByPid(pid.(int32))
//3. 根据pID得到player对象
player := core.WorldMgrObj.GetPlayerByPID(pID.(int32))
//4. 让player对象发起聊天广播请求
player.Talk(msg.Content)

45
zinx_app_demo/mmo_game/client_AI_robot.go

@ -4,19 +4,20 @@ import (
"bytes"
"encoding/binary"
"fmt"
"github.com/aceld/zinx/zinx_app_demo/mmo_game/pb"
"github.com/golang/protobuf/proto"
"io"
"math/rand"
"net"
"runtime"
"sync"
"time"
"github.com/aceld/zinx/zinx_app_demo/mmo_game/pb"
"github.com/golang/protobuf/proto"
)
type Message struct {
Len uint32
MsgId uint32
MsgID uint32
Data []byte
}
@ -26,7 +27,7 @@ type TcpClient struct {
Y float32
Z float32
V float32
Pid int32
PID int32
isOnline chan bool
}
@ -40,8 +41,8 @@ func (this *TcpClient) Unpack(headdata []byte) (head *Message, err error) {
return nil, err
}
// 读取MsgId
if err = binary.Read(headBuf, binary.LittleEndian, &head.MsgId); err != nil {
// 读取MsgID
if err = binary.Read(headBuf, binary.LittleEndian, &head.MsgID); err != nil {
return nil, err
}
@ -53,14 +54,14 @@ func (this *TcpClient) Unpack(headdata []byte) (head *Message, err error) {
return head, nil
}
func (this *TcpClient) Pack(msgId uint32, dataBytes []byte) (out []byte, err error) {
func (this *TcpClient) Pack(msgID uint32, dataBytes []byte) (out []byte, err error) {
outbuff := bytes.NewBuffer([]byte{})
// 写Len
if err = binary.Write(outbuff, binary.LittleEndian, uint32(len(dataBytes))); err != nil {
return
}
// 写MsgId
if err = binary.Write(outbuff, binary.LittleEndian, msgId); err != nil {
// 写MsgID
if err = binary.Write(outbuff, binary.LittleEndian, msgID); err != nil {
return
}
@ -99,7 +100,7 @@ func (this *TcpClient) AIRobotAction() {
//随机获得动作
tp := rand.Intn(2)
if tp == 0 {
content := fmt.Sprintf("hello 我是player %d, 你是谁?", this.Pid)
content := fmt.Sprintf("hello 我是player %d, 你是谁?", this.PID)
msg := &pb.Talk{
Content: content,
}
@ -147,7 +148,7 @@ func (this *TcpClient) AIRobotAction() {
V: v,
}
fmt.Println(fmt.Sprintf("player ID: %d. Walking...", this.Pid))
fmt.Println(fmt.Sprintf("player ID: %d. Walking...", this.PID))
//发送移动MsgID:3的指令
this.SendMsg(3, msg)
}
@ -158,17 +159,17 @@ func (this *TcpClient) AIRobotAction() {
*/
func (this *TcpClient) DoMsg(msg *Message) {
//处理消息
//fmt.Println(fmt.Sprintf("msg id :%d, data len: %d", msg.MsgId, msg.Len))
if msg.MsgId == 1 {
//fmt.Println(fmt.Sprintf("msg ID :%d, data len: %d", msg.MsgID, msg.Len))
if msg.MsgID == 1 {
//服务器回执给客户端 分配ID
//解析proto
syncpid := &pb.SyncPid{}
_ = proto.Unmarshal(msg.Data, syncpid)
syncpID := &pb.SyncPID{}
_ = proto.Unmarshal(msg.Data, syncpID)
//给当前客户端ID进行赋值
this.Pid = syncpid.Pid
} else if msg.MsgId == 200 {
this.PID = syncpID.PID
} else if msg.MsgID == 200 {
//服务器回执客户端广播数据
//解析proto
@ -176,20 +177,20 @@ func (this *TcpClient) DoMsg(msg *Message) {
_ = proto.Unmarshal(msg.Data, bdata)
//初次玩家上线 广播位置消息
if bdata.Tp == 2 && bdata.Pid == this.Pid {
if bdata.Tp == 2 && bdata.PID == this.PID {
//本人
//更新客户端坐标
this.X = bdata.GetP().X
this.Y = bdata.GetP().Y
this.Z = bdata.GetP().Z
this.V = bdata.GetP().V
fmt.Println(fmt.Sprintf("player ID: %d online.. at(%f,%f,%f,%f)", bdata.Pid, this.X, this.Y, this.Z, this.V))
fmt.Println(fmt.Sprintf("player ID: %d online.. at(%f,%f,%f,%f)", bdata.PID, this.X, this.Y, this.Z, this.V))
//玩家已经成功上线
this.isOnline <- true
} else if bdata.Tp == 1 {
fmt.Println(fmt.Sprintf("世界聊天,玩家%d说的话是: %s", bdata.Pid, bdata.GetContent()))
fmt.Println(fmt.Sprintf("世界聊天,玩家%d说的话是: %s", bdata.PID, bdata.GetContent()))
}
}
}
@ -197,7 +198,7 @@ func (this *TcpClient) DoMsg(msg *Message) {
func (this *TcpClient) Start() {
go func() {
for {
//读取服务端发来的数据 ==》 SyncPid
//读取服务端发来的数据 ==》 SyncPID
//1.读取8字节
//第一次读取,读取数据头
headData := make([]byte, 8)
@ -249,7 +250,7 @@ func NewTcpClient(ip string, port int) *TcpClient {
client := &TcpClient{
conn: conn,
Pid: 0,
PID: 0,
X: 0,
Y: 0,
Z: 0,

92
zinx_app_demo/mmo_game/core/aoi.go

@ -21,7 +21,7 @@ type AOIManager struct {
MinY int //区域上边界坐标
MaxY int //区域下边界坐标
CntsY int //y方向的格子数量
grids map[int]*Grid //当前区域中都有哪些格子,key=格子ID, value=格子对象
grIDs map[int]*GrID //当前区域中都有哪些格子,key=格子ID, value=格子对象
}
/*
@ -35,22 +35,22 @@ func NewAOIManager(minX, maxX, cntsX, minY, maxY, cntsY int) *AOIManager {
MinY: minY,
MaxY: maxY,
CntsY: cntsY,
grids: make(map[int]*Grid),
grIDs: make(map[int]*GrID),
}
//给AOI初始化区域中所有的格子
for y := 0; y < cntsY; y++ {
for x := 0; x < cntsX; x++ {
//计算格子ID
//格子编号:id = idy *nx + idx (利用格子坐标得到格子编号)
gid := y*cntsX + x
//格子编号:ID = IDy *nx + IDx (利用格子坐标得到格子编号)
gID := y*cntsX + x
//初始化一个格子放在AOI中的map里,key是当前格子的ID
aoiMgr.grids[gid] = NewGrid(gid,
aoiMgr.MinX+x*aoiMgr.gridWidth(),
aoiMgr.MinX+(x+1)*aoiMgr.gridWidth(),
aoiMgr.MinY+y*aoiMgr.gridLength(),
aoiMgr.MinY+(y+1)*aoiMgr.gridLength())
aoiMgr.grIDs[gID] = NewGrID(gID,
aoiMgr.MinX+x*aoiMgr.grIDWIDth(),
aoiMgr.MinX+(x+1)*aoiMgr.grIDWIDth(),
aoiMgr.MinY+y*aoiMgr.grIDLength(),
aoiMgr.MinY+(y+1)*aoiMgr.grIDLength())
}
}
@ -58,114 +58,114 @@ func NewAOIManager(minX, maxX, cntsX, minY, maxY, cntsY int) *AOIManager {
}
//得到每个格子在x轴方向的宽度
func (m *AOIManager) gridWidth() int {
func (m *AOIManager) grIDWIDth() int {
return (m.MaxX - m.MinX) / m.CntsX
}
//得到每个格子在x轴方向的长度
func (m *AOIManager) gridLength() int {
func (m *AOIManager) grIDLength() int {
return (m.MaxY - m.MinY) / m.CntsY
}
//打印信息方法
func (m *AOIManager) String() string {
s := fmt.Sprintf("AOIManagr:\nminX:%d, maxX:%d, cntsX:%d, minY:%d, maxY:%d, cntsY:%d\n Grids in AOI Manager:\n",
s := fmt.Sprintf("AOIManagr:\nminX:%d, maxX:%d, cntsX:%d, minY:%d, maxY:%d, cntsY:%d\n GrIDs in AOI Manager:\n",
m.MinX, m.MaxX, m.CntsX, m.MinY, m.MaxY, m.CntsY)
for _, grid := range m.grids {
s += fmt.Sprintln(grid)
for _, grID := range m.grIDs {
s += fmt.Sprintln(grID)
}
return s
}
//根据格子的gID得到当前周边的九宫格信息
func (m *AOIManager) GetSurroundGridsByGid(gID int) (grids []*Grid) {
func (m *AOIManager) GetSurroundGrIDsByGID(gID int) (grIDs []*GrID) {
//判断gID是否存在
if _, ok := m.grids[gID]; !ok {
if _, ok := m.grIDs[gID]; !ok {
return
}
//将当前gid添加到九宫格中
grids = append(grids, m.grids[gID])
//将当前gID添加到九宫格中
grIDs = append(grIDs, m.grIDs[gID])
// 根据gID, 得到格子所在的坐标
x, y := gID%m.CntsX, gID/m.CntsX
// 新建一个临时存储周围格子的数组
surroundGid := make([]int, 0)
surroundGID := make([]int, 0)
// 新建8个方向向量: 左上: (-1, -1), 左中: (-1, 0), 左下: (-1,1), 中上: (0,-1), 中下: (0,1), 右上:(1, -1)
// 右中: (1, 0), 右下: (1, 1), 分别将这8个方向的方向向量按顺序写入x, y的分量数组
dx := []int{-1, -1, -1, 0, 0, 1, 1, 1}
dy := []int{-1, 0, 1, -1, 1, -1, 0, 1}
// 根据8个方向向量, 得到周围点的相对坐标, 挑选出没有越界的坐标, 将坐标转换为gid
// 根据8个方向向量, 得到周围点的相对坐标, 挑选出没有越界的坐标, 将坐标转换为gID
for i := 0; i < 8; i++ {
newX := x + dx[i]
newY := y + dy[i]
if newX >= 0 && newX < m.CntsX && newY >= 0 && newY < m.CntsY {
surroundGid = append(surroundGid, newY*m.CntsX+newX)
surroundGID = append(surroundGID, newY*m.CntsX+newX)
}
}
// 根据没有越界的gid, 得到格子信息
for _, gid := range surroundGid {
grids = append(grids, m.grids[gid])
// 根据没有越界的gID, 得到格子信息
for _, gID := range surroundGID {
grIDs = append(grIDs, m.grIDs[gID])
}
return
}
//通过横纵坐标获取对应的格子ID
func (m *AOIManager) GetGidByPos(x, y float32) int {
gx := (int(x) - m.MinX) / m.gridWidth()
gy := (int(y) - m.MinY) / m.gridLength()
func (m *AOIManager) GetGIDByPos(x, y float32) int {
gx := (int(x) - m.MinX) / m.grIDWIDth()
gy := (int(y) - m.MinY) / m.grIDLength()
return gy*m.CntsX + gx
}
//通过横纵坐标得到周边九宫格内的全部PlayerIDs
func (m *AOIManager) GetPidsByPos(x, y float32) (playerIDs []int) {
func (m *AOIManager) GetPIDsByPos(x, y float32) (playerIDs []int) {
//根据横纵坐标得到当前坐标属于哪个格子ID
gID := m.GetGidByPos(x, y)
gID := m.GetGIDByPos(x, y)
//根据格子ID得到周边九宫格的信息
grids := m.GetSurroundGridsByGid(gID)
for _, v := range grids {
grIDs := m.GetSurroundGrIDsByGID(gID)
for _, v := range grIDs {
playerIDs = append(playerIDs, v.GetPlyerIDs()...)
//fmt.Printf("===> grid ID : %d, pids : %v ====", v.GID, v.GetPlyerIDs())
//fmt.Printf("===> grID ID : %d, pIDs : %v ====", v.GID, v.GetPlyerIDs())
}
return
}
//通过GID获取当前格子的全部playerID
func (m *AOIManager) GetPidsByGid(gID int) (playerIDs []int) {
playerIDs = m.grids[gID].GetPlyerIDs()
func (m *AOIManager) GetPIDsByGID(gID int) (playerIDs []int) {
playerIDs = m.grIDs[gID].GetPlyerIDs()
return
}
//移除一个格子中的PlayerID
func (m *AOIManager) RemovePidFromGrid(pID, gID int) {
m.grids[gID].Remove(pID)
func (m *AOIManager) RemovePIDFromGrID(pID, gID int) {
m.grIDs[gID].Remove(pID)
}
//添加一个PlayerID到一个格子中
func (m *AOIManager) AddPidToGrid(pID, gID int) {
m.grids[gID].Add(pID)
func (m *AOIManager) AddPIDToGrID(pID, gID int) {
m.grIDs[gID].Add(pID)
}
//通过横纵坐标添加一个Player到一个格子中
func (m *AOIManager) AddToGridByPos(pID int, x, y float32) {
gID := m.GetGidByPos(x, y)
grid := m.grids[gID]
grid.Add(pID)
func (m *AOIManager) AddToGrIDByPos(pID int, x, y float32) {
gID := m.GetGIDByPos(x, y)
grID := m.grIDs[gID]
grID.Add(pID)
}
//通过横纵坐标把一个Player从对应的格子中删除
func (m *AOIManager) RemoveFromGridByPos(pID int, x, y float32) {
gID := m.GetGidByPos(x, y)
grid := m.grids[gID]
grid.Remove(pID)
func (m *AOIManager) RemoveFromGrIDByPos(pID int, x, y float32) {
gID := m.GetGIDByPos(x, y)
grID := m.grIDs[gID]
grID.Remove(pID)
}

16
zinx_app_demo/mmo_game/core/aoi_test.go

@ -10,18 +10,18 @@ func TestNewAOIManager(t *testing.T) {
fmt.Println(aoiMgr)
}
func TestAOIManagerSuroundGridsByGid(t *testing.T) {
func TestAOIManagerSuroundGrIDsByGID(t *testing.T) {
aoiMgr := NewAOIManager(0, 250, 5, 0, 250, 5)
for k, _ := range aoiMgr.grids {
for k, _ := range aoiMgr.grIDs {
//得到当前格子周边的九宫格
grids := aoiMgr.GetSurroundGridsByGid(k)
grIDs := aoiMgr.GetSurroundGrIDsByGID(k)
//得到九宫格所有的IDs
fmt.Println("gid : ", k, " grids len = ", len(grids))
gIDs := make([]int, 0, len(grids))
for _, grid := range grids {
gIDs = append(gIDs, grid.GID)
fmt.Println("gID : ", k, " grIDs len = ", len(grIDs))
gIDs := make([]int, 0, len(grIDs))
for _, grID := range grIDs {
gIDs = append(gIDs, grID.GID)
}
fmt.Printf("grid ID: %d, surrounding grid IDs are %v\n", k, gIDs)
fmt.Printf("grID ID: %d, surrounding grID IDs are %v\n", k, gIDs)
}
}

16
zinx_app_demo/mmo_game/core/grid.go

@ -8,7 +8,7 @@ import (
/*
一个地图中的格子类
*/
type Grid struct {
type GrID struct {
GID int //格子ID
MinX int //格子左边界坐标
MaxX int //格子右边界坐标
@ -19,8 +19,8 @@ type Grid struct {
}
//初始化一个格子
func NewGrid(gID, minX, maxX, minY, maxY int) *Grid {
return &Grid{
func NewGrID(gID, minX, maxX, minY, maxY int) *GrID {
return &GrID{
GID: gID,
MinX: minX,
MaxX: maxX,
@ -31,7 +31,7 @@ func NewGrid(gID, minX, maxX, minY, maxY int) *Grid {
}
//向当前格子中添加一个玩家
func (g *Grid) Add(playerID int) {
func (g *GrID) Add(playerID int) {
g.pIDLock.Lock()
defer g.pIDLock.Unlock()
@ -39,7 +39,7 @@ func (g *Grid) Add(playerID int) {
}
//从格子中删除一个玩家
func (g *Grid) Remove(playerID int) {
func (g *GrID) Remove(playerID int) {
g.pIDLock.Lock()
defer g.pIDLock.Unlock()
@ -47,7 +47,7 @@ func (g *Grid) Remove(playerID int) {
}
//得到当前格子中所有的玩家
func (g *Grid) GetPlyerIDs() (playerIDs []int) {
func (g *GrID) GetPlyerIDs() (playerIDs []int) {
g.pIDLock.RLock()
defer g.pIDLock.RUnlock()
@ -59,7 +59,7 @@ func (g *Grid) GetPlyerIDs() (playerIDs []int) {
}
//打印信息方法
func (g *Grid) String() string {
return fmt.Sprintf("Grid id: %d, minX:%d, maxX:%d, minY:%d, maxY:%d, playerIDs:%v",
func (g *GrID) String() string {
return fmt.Sprintf("GrID ID: %d, minX:%d, maxX:%d, minY:%d, maxY:%d, playerIDs:%v",
g.GID, g.MinX, g.MaxX, g.MinY, g.MaxY, g.playerIDs)
}

161
zinx_app_demo/mmo_game/core/player.go

@ -2,17 +2,18 @@ package core
import (
"fmt"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zinx_app_demo/mmo_game/pb"
"github.com/golang/protobuf/proto"
"math/rand"
"sync"
"time"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zinx_app_demo/mmo_game/pb"
"github.com/golang/protobuf/proto"
)
//玩家对象
type Player struct {
Pid int32 //玩家ID
PID int32 //玩家ID
Conn ziface.IConnection //当前玩家的连接
X float32 //平面x坐标
Y float32 //高度
@ -23,19 +24,19 @@ type Player struct {
/*
Player ID 生成器
*/
var PidGen int32 = 1 //用来生成玩家ID的计数器
var IdLock sync.Mutex //保护PidGen的互斥机制
var PIDGen int32 = 1 //用来生成玩家ID的计数器
var IDLock sync.Mutex //保护PIDGen的互斥机制
//创建一个玩家对象
func NewPlayer(conn ziface.IConnection) *Player {
//生成一个PID
IdLock.Lock()
id := PidGen
PidGen++
IdLock.Unlock()
IDLock.Lock()
ID := PIDGen
PIDGen++
IDLock.Unlock()
p := &Player{
Pid: id,
PID: ID,
Conn: conn,
X: float32(160 + rand.Intn(50)), //随机在160坐标点 基于X轴偏移若干坐标
Y: 0, //高度为0
@ -46,11 +47,11 @@ func NewPlayer(conn ziface.IConnection) *Player {
return p
}
//告知客户端pid,同步已经生成的玩家ID给客户端
func (p *Player) SyncPid() {
//组建MsgId0 proto数据
data := &pb.SyncPid{
Pid: p.Pid,
//告知客户端pID,同步已经生成的玩家ID给客户端
func (p *Player) SyncPID() {
//组建MsgID0 proto数据
data := &pb.SyncPID{
PID: p.PID,
}
//发送数据给客户端
@ -60,9 +61,9 @@ func (p *Player) SyncPid() {
//广播玩家自己的出生地点
func (p *Player) BroadCastStartPosition() {
//组建MsgId200 proto数据
//组建MsgID200 proto数据
msg := &pb.BroadCast{
Pid: p.Pid,
PID: p.PID,
Tp: 2, //TP2 代表广播坐标
Data: &pb.BroadCast_P{
P: &pb.Position{
@ -80,17 +81,17 @@ func (p *Player) BroadCastStartPosition() {
//给当前玩家周边的(九宫格内)玩家广播自己的位置,让他们显示自己
func (p *Player) SyncSurrounding() {
//1 根据自己的位置,获取周围九宫格内的玩家pid
pids := WorldMgrObj.AoiMgr.GetPidsByPos(p.X, p.Z)
//2 根据pid得到所有玩家对象
players := make([]*Player, 0, len(pids))
//1 根据自己的位置,获取周围九宫格内的玩家pID
pIDs := WorldMgrObj.AoiMgr.GetPIDsByPos(p.X, p.Z)
//2 根据pID得到所有玩家对象
players := make([]*Player, 0, len(pIDs))
//3 给这些玩家发送MsgID:200消息,让自己出现在对方视野中
for _, pid := range pids {
players = append(players, WorldMgrObj.GetPlayerByPid(int32(pid)))
for _, pID := range pIDs {
players = append(players, WorldMgrObj.GetPlayerByPID(int32(pID)))
}
//3.1 组建MsgId200 proto数据
//3.1 组建MsgID200 proto数据
msg := &pb.BroadCast{
Pid: p.Pid,
PID: p.PID,
Tp: 2, //TP2 代表广播坐标
Data: &pb.BroadCast_P{
P: &pb.Position{
@ -110,7 +111,7 @@ func (p *Player) SyncSurrounding() {
playersData := make([]*pb.Player, 0, len(players))
for _, player := range players {
p := &pb.Player{
Pid: player.Pid,
PID: player.PID,
P: &pb.Position{
X: player.X,
Y: player.Y,
@ -132,9 +133,9 @@ func (p *Player) SyncSurrounding() {
//广播玩家聊天
func (p *Player) Talk(content string) {
//1. 组建MsgId200 proto数据
//1. 组建MsgID200 proto数据
msg := &pb.BroadCast{
Pid: p.Pid,
PID: p.PID,
Tp: 1, //TP 1 代表聊天广播
Data: &pb.BroadCast_Content{
Content: content,
@ -144,7 +145,7 @@ func (p *Player) Talk(content string) {
//2. 得到当前世界所有的在线玩家
players := WorldMgrObj.GetAllPlayers()
//3. 向所有的玩家发送MsgId:200消息
//3. 向所有的玩家发送MsgID:200消息
for _, player := range players {
player.SendMsg(200, msg)
}
@ -154,10 +155,10 @@ func (p *Player) Talk(content string) {
func (p *Player) UpdatePos(x float32, y float32, z float32, v float32) {
//触发消失视野和添加视野业务
//计算旧格子gid
oldGid := WorldMgrObj.AoiMgr.GetGidByPos(p.X, p.Z)
//计算新格子gid
newGid := WorldMgrObj.AoiMgr.GetGidByPos(x, z)
//计算旧格子gID
oldGID := WorldMgrObj.AoiMgr.GetGIDByPos(p.X, p.Z)
//计算新格子gID
newGID := WorldMgrObj.AoiMgr.GetGIDByPos(x, z)
//更新玩家的位置信息
p.X = x
@ -165,19 +166,19 @@ func (p *Player) UpdatePos(x float32, y float32, z float32, v float32) {
p.Z = z
p.V = v
if oldGid != newGid {
if oldGID != newGID {
//触发gird切换
//把pid从就的aoi格子中删除
WorldMgrObj.AoiMgr.RemovePidFromGrid(int(p.Pid), oldGid)
//把pid添加到新的aoi格子中去
WorldMgrObj.AoiMgr.AddPidToGrid(int(p.Pid), newGid)
//把pID从就的aoi格子中删除
WorldMgrObj.AoiMgr.RemovePIDFromGrID(int(p.PID), oldGID)
//把pID添加到新的aoi格子中去
WorldMgrObj.AoiMgr.AddPIDToGrID(int(p.PID), newGID)
_ = p.OnExchangeAoiGrid(oldGid, newGid)
_ = p.OnExchangeAoiGrID(oldGID, newGID)
}
//组装protobuf协议,发送位置给周围玩家
msg := &pb.BroadCast{
Pid: p.Pid,
PID: p.PID,
Tp: 4, //4- 移动之后的坐标信息
Data: &pb.BroadCast_P{
P: &pb.Position{
@ -197,47 +198,47 @@ func (p *Player) UpdatePos(x float32, y float32, z float32, v float32) {
}
}
func (p *Player) OnExchangeAoiGrid(oldGid, newGid int) error {
func (p *Player) OnExchangeAoiGrID(oldGID, newGID int) error {
//获取就的九宫格成员
oldGrids := WorldMgrObj.AoiMgr.GetSurroundGridsByGid(oldGid)
oldGrIDs := WorldMgrObj.AoiMgr.GetSurroundGrIDsByGID(oldGID)
//为旧的九宫格成员建立哈希表,用来快速查找
oldGridsMap := make(map[int]bool, len(oldGrids))
for _, grid := range oldGrids {
oldGridsMap[grid.GID] = true
oldGrIDsMap := make(map[int]bool, len(oldGrIDs))
for _, grID := range oldGrIDs {
oldGrIDsMap[grID.GID] = true
}
//获取新的九宫格成员
newGrids := WorldMgrObj.AoiMgr.GetSurroundGridsByGid(newGid)
newGrIDs := WorldMgrObj.AoiMgr.GetSurroundGrIDsByGID(newGID)
//为新的九宫格成员建立哈希表,用来快速查找
newGridsMap := make(map[int]bool, len(newGrids))
for _, grid := range newGrids {
newGridsMap[grid.GID] = true
newGrIDsMap := make(map[int]bool, len(newGrIDs))
for _, grID := range newGrIDs {
newGrIDsMap[grID.GID] = true
}
//------ > 处理视野消失 <-------
offlineMsg := &pb.SyncPid{
Pid: p.Pid,
offlineMsg := &pb.SyncPID{
PID: p.PID,
}
//找到在旧的九宫格中出现,但是在新的九宫格中没有出现的格子
leavingGrids := make([]*Grid, 0)
for _, grid := range oldGrids {
if _, ok := newGridsMap[grid.GID]; !ok {
leavingGrids = append(leavingGrids, grid)
leavingGrIDs := make([]*GrID, 0)
for _, grID := range oldGrIDs {
if _, ok := newGrIDsMap[grID.GID]; !ok {
leavingGrIDs = append(leavingGrIDs, grID)
}
}
//获取需要消失的格子中的全部玩家
for _, grid := range leavingGrids {
players := WorldMgrObj.GetPlayersByGid(grid.GID)
for _, grID := range leavingGrIDs {
players := WorldMgrObj.GetPlayersByGID(grID.GID)
for _, player := range players {
//让自己在其他玩家的客户端中消失
player.SendMsg(201, offlineMsg)
//将其他玩家信息 在自己的客户端中消失
anotherOfflineMsg := &pb.SyncPid{
Pid: player.Pid,
anotherOfflineMsg := &pb.SyncPID{
PID: player.PID,
}
p.SendMsg(201, anotherOfflineMsg)
time.Sleep(200 * time.Millisecond)
@ -247,15 +248,15 @@ func (p *Player) OnExchangeAoiGrid(oldGid, newGid int) error {
//------ > 处理视野出现 <-------
//找到在新的九宫格内出现,但是没有在就的九宫格内出现的格子
enteringGrids := make([]*Grid, 0)
for _, grid := range newGrids {
if _, ok := oldGridsMap[grid.GID]; !ok {
enteringGrids = append(enteringGrids, grid)
enteringGrIDs := make([]*GrID, 0)
for _, grID := range newGrIDs {
if _, ok := oldGrIDsMap[grID.GID]; !ok {
enteringGrIDs = append(enteringGrIDs, grID)
}
}
onlineMsg := &pb.BroadCast{
Pid: p.Pid,
PID: p.PID,
Tp: 2,
Data: &pb.BroadCast_P{
P: &pb.Position{
@ -268,8 +269,8 @@ func (p *Player) OnExchangeAoiGrid(oldGid, newGid int) error {
}
//获取需要显示格子的全部玩家
for _, grid := range enteringGrids {
players := WorldMgrObj.GetPlayersByGid(grid.GID)
for _, grID := range enteringGrIDs {
players := WorldMgrObj.GetPlayersByGID(grID.GID)
for _, player := range players {
//让自己出现在其他人视野中
@ -277,7 +278,7 @@ func (p *Player) OnExchangeAoiGrid(oldGid, newGid int) error {
//让其他人出现在自己的视野中
anotherOnlineMsg := &pb.BroadCast{
Pid: player.Pid,
PID: player.PID,
Tp: 2,
Data: &pb.BroadCast_P{
P: &pb.Position{
@ -299,13 +300,13 @@ func (p *Player) OnExchangeAoiGrid(oldGid, newGid int) error {
//获得当前玩家的AOI周边玩家信息
func (p *Player) GetSurroundingPlayers() []*Player {
//得到当前AOI区域的所有pid
pids := WorldMgrObj.AoiMgr.GetPidsByPos(p.X, p.Z)
//得到当前AOI区域的所有pID
pIDs := WorldMgrObj.AoiMgr.GetPIDsByPos(p.X, p.Z)
//将所有pid对应的Player放到Player切片中
players := make([]*Player, 0, len(pids))
for _, pid := range pids {
players = append(players, WorldMgrObj.GetPlayerByPid(int32(pid)))
//将所有pID对应的Player放到Player切片中
players := make([]*Player, 0, len(pIDs))
for _, pID := range pIDs {
players = append(players, WorldMgrObj.GetPlayerByPID(int32(pID)))
}
return players
@ -317,8 +318,8 @@ func (p *Player) LostConnection() {
players := p.GetSurroundingPlayers()
//2 封装MsgID:201消息
msg := &pb.SyncPid{
Pid: p.Pid,
msg := &pb.SyncPID{
PID: p.PID,
}
//3 向周围玩家发送消息
@ -327,15 +328,15 @@ func (p *Player) LostConnection() {
}
//4 世界管理器将当前玩家从AOI中摘除
WorldMgrObj.AoiMgr.RemoveFromGridByPos(int(p.Pid), p.X, p.Z)
WorldMgrObj.RemovePlayerByPid(p.Pid)
WorldMgrObj.AoiMgr.RemoveFromGrIDByPos(int(p.PID), p.X, p.Z)
WorldMgrObj.RemovePlayerByPID(p.PID)
}
/*
发送消息给客户端
主要是将pb的protobuf数据序列化之后发送
*/
func (p *Player) SendMsg(msgId uint32, data proto.Message) {
func (p *Player) SendMsg(msgID uint32, data proto.Message) {
//fmt.Printf("before Marshal data = %+v\n", data)
//将proto Message结构体序列化
msg, err := proto.Marshal(data)
@ -351,7 +352,7 @@ func (p *Player) SendMsg(msgId uint32, data proto.Message) {
}
//调用Zinx框架的SendMsg发包
if err := p.Conn.SendMsg(msgId, msg); err != nil {
if err := p.Conn.SendMsg(msgID, msg); err != nil {
fmt.Println("Player SendMsg error !")
return
}

28
zinx_app_demo/mmo_game/core/world_manager.go

@ -28,26 +28,26 @@ func init() {
func (wm *WorldManager) AddPlayer(player *Player) {
//将player添加到 世界管理器中
wm.pLock.Lock()
wm.Players[player.Pid] = player
wm.Players[player.PID] = player
wm.pLock.Unlock()
//将player 添加到AOI网络规划中
wm.AoiMgr.AddToGridByPos(int(player.Pid), player.X, player.Z)
wm.AoiMgr.AddToGrIDByPos(int(player.PID), player.X, player.Z)
}
//从玩家信息表中移除一个玩家
func (wm *WorldManager) RemovePlayerByPid(pid int32) {
func (wm *WorldManager) RemovePlayerByPID(pID int32) {
wm.pLock.Lock()
delete(wm.Players, pid)
delete(wm.Players, pID)
wm.pLock.Unlock()
}
//通过玩家ID 获取对应玩家信息
func (wm *WorldManager) GetPlayerByPid(pid int32) *Player {
func (wm *WorldManager) GetPlayerByPID(pID int32) *Player {
wm.pLock.RLock()
defer wm.pLock.RUnlock()
return wm.Players[pid]
return wm.Players[pID]
}
//获取所有玩家的信息
@ -67,16 +67,16 @@ func (wm *WorldManager) GetAllPlayers() []*Player {
return players
}
//获取指定gid中的所有player信息
func (wm *WorldManager) GetPlayersByGid(gid int) []*Player {
//通过gid获取 对应 格子中的所有pid
pids := wm.AoiMgr.grids[gid].GetPlyerIDs()
//获取指定gID中的所有player信息
func (wm *WorldManager) GetPlayersByGID(gID int) []*Player {
//通过gID获取 对应 格子中的所有pID
pIDs := wm.AoiMgr.grIDs[gID].GetPlyerIDs()
//通过pid找到对应的player对象
players := make([]*Player, 0, len(pids))
//通过pID找到对应的player对象
players := make([]*Player, 0, len(pIDs))
wm.pLock.RLock()
for _, pid := range pids {
players = append(players, wm.Players[int32(pid)])
for _, pID := range pIDs {
players = append(players, wm.Players[int32(pID)])
}
wm.pLock.RUnlock()

54
zinx_app_demo/mmo_game/pb/msg.pb.go

@ -21,41 +21,41 @@ var _ = math.Inf
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
//同步客户端玩家ID
type SyncPid struct {
Pid int32 `protobuf:"varint,1,opt,name=Pid,proto3" json:"Pid,omitempty"`
type SyncPID struct {
PID int32 `protobuf:"varint,1,opt,name=PID,proto3" json:"PID,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SyncPid) Reset() { *m = SyncPid{} }
func (m *SyncPid) String() string { return proto.CompactTextString(m) }
func (*SyncPid) ProtoMessage() {}
func (*SyncPid) Descriptor() ([]byte, []int) {
func (m *SyncPID) Reset() { *m = SyncPID{} }
func (m *SyncPID) String() string { return proto.CompactTextString(m) }
func (*SyncPID) ProtoMessage() {}
func (*SyncPID) Descriptor() ([]byte, []int) {
return fileDescriptor_c06e4cca6c2cc899, []int{0}
}
func (m *SyncPid) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SyncPid.Unmarshal(m, b)
func (m *SyncPID) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SyncPID.Unmarshal(m, b)
}
func (m *SyncPid) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SyncPid.Marshal(b, m, deterministic)
func (m *SyncPID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SyncPID.Marshal(b, m, deterministic)
}
func (m *SyncPid) XXX_Merge(src proto.Message) {
xxx_messageInfo_SyncPid.Merge(m, src)
func (m *SyncPID) XXX_Merge(src proto.Message) {
xxx_messageInfo_SyncPID.Merge(m, src)
}
func (m *SyncPid) XXX_Size() int {
return xxx_messageInfo_SyncPid.Size(m)
func (m *SyncPID) XXX_Size() int {
return xxx_messageInfo_SyncPID.Size(m)
}
func (m *SyncPid) XXX_DiscardUnknown() {
xxx_messageInfo_SyncPid.DiscardUnknown(m)
func (m *SyncPID) XXX_DiscardUnknown() {
xxx_messageInfo_SyncPID.DiscardUnknown(m)
}
var xxx_messageInfo_SyncPid proto.InternalMessageInfo
var xxx_messageInfo_SyncPID proto.InternalMessageInfo
func (m *SyncPid) GetPid() int32 {
func (m *SyncPID) GetPID() int32 {
if m != nil {
return m.Pid
return m.PID
}
return 0
}
@ -126,9 +126,9 @@ func (m *Position) GetV() float32 {
//玩家广播数据
type BroadCast struct {
Pid int32 `protobuf:"varint,1,opt,name=Pid,proto3" json:"Pid,omitempty"`
PID int32 `protobuf:"varint,1,opt,name=PID,proto3" json:"PID,omitempty"`
Tp int32 `protobuf:"varint,2,opt,name=Tp,proto3" json:"Tp,omitempty"`
// Types that are valid to be assigned to Data:
// Types that are valID to be assigned to Data:
// *BroadCast_Content
// *BroadCast_P
// *BroadCast_ActionData
@ -163,9 +163,9 @@ func (m *BroadCast) XXX_DiscardUnknown() {
var xxx_messageInfo_BroadCast proto.InternalMessageInfo
func (m *BroadCast) GetPid() int32 {
func (m *BroadCast) GetPID() int32 {
if m != nil {
return m.Pid
return m.PID
}
return 0
}
@ -278,7 +278,7 @@ func (m *Talk) GetContent() string {
//玩家信息
type Player struct {
Pid int32 `protobuf:"varint,1,opt,name=Pid,proto3" json:"Pid,omitempty"`
PID int32 `protobuf:"varint,1,opt,name=PID,proto3" json:"PID,omitempty"`
P *Position `protobuf:"bytes,2,opt,name=P,proto3" json:"P,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
@ -310,9 +310,9 @@ func (m *Player) XXX_DiscardUnknown() {
var xxx_messageInfo_Player proto.InternalMessageInfo
func (m *Player) GetPid() int32 {
func (m *Player) GetPID() int32 {
if m != nil {
return m.Pid
return m.PID
}
return 0
}
@ -365,7 +365,7 @@ func (m *SyncPlayers) GetPs() []*Player {
}
func init() {
proto.RegisterType((*SyncPid)(nil), "pb.SyncPid")
proto.RegisterType((*SyncPID)(nil), "pb.SyncPID")
proto.RegisterType((*Position)(nil), "pb.Position")
proto.RegisterType((*BroadCast)(nil), "pb.BroadCast")
proto.RegisterType((*Talk)(nil), "pb.Talk")

8
zinx_app_demo/mmo_game/pb/msg.proto

@ -3,8 +3,8 @@ package pb; //当前包名
option csharp_namespace="Pb"; //C#
//ID
message SyncPid{
int32 Pid=1;
message SyncPID{
int32 PID=1;
}
//
@ -17,7 +17,7 @@ message Position{
//广
message BroadCast{
int32 Pid=1;
int32 PID=1;
int32 Tp=2; //1- 2- 3 4
oneof Data {
string Content=3; //
@ -33,7 +33,7 @@ message Talk{
//
message Player{
int32 Pid=1;
int32 PID=1;
Position P=2;
}

21
zinx_app_demo/mmo_game/server.go

@ -2,6 +2,7 @@ package main
import (
"fmt"
"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zinx_app_demo/mmo_game/api"
"github.com/aceld/zinx/zinx_app_demo/mmo_game/core"
@ -14,7 +15,7 @@ func OnConnecionAdd(conn ziface.IConnection) {
player := core.NewPlayer(conn)
//同步当前的PlayerID给客户端, 走MsgID:1 消息
player.SyncPid()
player.SyncPID()
//同步当前玩家的初始化坐标信息给客户端,走MsgID:200消息
player.BroadCastStartPosition()
@ -22,29 +23,29 @@ func OnConnecionAdd(conn ziface.IConnection) {
//将当前新上线玩家添加到worldManager中
core.WorldMgrObj.AddPlayer(player)
//将该连接绑定属性Pid
conn.SetProperty("pid", player.Pid)
//将该连接绑定属性PID
conn.SetProperty("pID", player.PID)
//同步周边玩家上线信息,与现实周边玩家信息
player.SyncSurrounding()
fmt.Println("=====> Player pidId = ", player.Pid, " arrived ====")
fmt.Println("=====> Player pIDID = ", player.PID, " arrived ====")
}
//当客户端断开连接的时候的hook函数
func OnConnectionLost(conn ziface.IConnection) {
//获取当前连接的Pid属性
pid, _ := conn.GetProperty("pid")
//获取当前连接的PID属性
pID, _ := conn.GetProperty("pID")
//根据pid获取对应的玩家对象
player := core.WorldMgrObj.GetPlayerByPid(pid.(int32))
//根据pID获取对应的玩家对象
player := core.WorldMgrObj.GetPlayerByPID(pID.(int32))
//触发玩家下线业务
if pid != nil {
if pID != nil {
player.LostConnection()
}
fmt.Println("====> Player ", pid, " left =====")
fmt.Println("====> Player ", pID, " left =====")
}

21
zlog/stdzlog.go

@ -7,57 +7,60 @@ package zlog
import "os"
//StdZinxLog 创建全局log
var StdZinxLog = NewZinxLog(os.Stderr, "", BitDefault)
//获取StdZinxLog 标记位
//Flags 获取StdZinxLog 标记位
func Flags() int {
return StdZinxLog.Flags()
}
//设置StdZinxLog标记位
//ResetFlags 设置StdZinxLog标记位
func ResetFlags(flag int) {
StdZinxLog.ResetFlags(flag)
}
//添加flag标记
//AddFlag 添加flag标记
func AddFlag(flag int) {
StdZinxLog.AddFlag(flag)
}
//设置StdZinxLog 日志头前缀
//SetPrefix 设置StdZinxLog 日志头前缀
func SetPrefix(prefix string) {
StdZinxLog.SetPrefix(prefix)
}
//设置StdZinxLog绑定的日志文件
//SetLogFile 设置StdZinxLog绑定的日志文件
func SetLogFile(fileDir string, fileName string) {
StdZinxLog.SetLogFile(fileDir, fileName)
}
//设置关闭debug
//CloseDebug 设置关闭debug
func CloseDebug() {
StdZinxLog.CloseDebug()
}
//设置打开debug
//OpenDebug 设置打开debug
func OpenDebug() {
StdZinxLog.OpenDebug()
}
// ====> Debug <====
//Debugf ====> Debug <====
func Debugf(format string, v ...interface{}) {
StdZinxLog.Debugf(format, v...)
}
//Debug Debug
func Debug(v ...interface{}) {
StdZinxLog.Debug(v...)
}
// ====> Info <====
//Infof ====> Info <====
func Infof(format string, v ...interface{}) {
StdZinxLog.Infof(format, v...)
}
//Info -
func Info(v ...interface{}) {
StdZinxLog.Info(v...)
}

10
zlog/zlogger.go

@ -372,9 +372,9 @@ func mkdirLog(dir string) (e error) {
//将一个整形转换成一个固定长度的字符串,字符串宽度应该是大于0的
//要确保buffer是有容量空间的
func itoa(buf *bytes.Buffer, i int, wid int) {
func itoa(buf *bytes.Buffer, i int, wID int) {
var u uint = uint(i)
if u == 0 && wid <= 1 {
if u == 0 && wID <= 1 {
buf.WriteByte('0')
return
}
@ -382,13 +382,13 @@ func itoa(buf *bytes.Buffer, i int, wid int) {
// Assemble decimal in reverse order.
var b [32]byte
bp := len(b)
for ; u > 0 || wid > 0; u /= 10 {
for ; u > 0 || wID > 0; u /= 10 {
bp--
wid--
wID--
b[bp] = byte(u%10) + '0'
}
// avoid slicing b to avoid an allocation.
// avoID slicing b to avoID an allocation.
for bp < len(b) {
buf.WriteByte(b[bp])
bp++

66
znet/connection.go

@ -12,14 +12,15 @@ import (
"github.com/aceld/zinx/ziface"
)
//Connection 链接
type Connection struct {
//当前Conn属于哪个Server
TcpServer ziface.IServer
TCPServer ziface.IServer
//当前连接的socket TCP套接字
Conn *net.TCPConn
//当前连接的ID 也可以称作为SessionID,ID全局唯一
ConnID uint32
//消息管理MsgId和对应处理方法的消息管理模块
//消息管理MsgID和对应处理方法的消息管理模块
MsgHandler ziface.IMsgHandle
//告知该链接已经退出/停止的channel
ctx context.Context
@ -38,11 +39,11 @@ type Connection struct {
isClosed bool
}
//创建连接的方法
//NewConntion 创建连接的方法
func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
//初始化Conn属性
c := &Connection{
TcpServer: server,
TCPServer: server,
Conn: conn,
ConnID: connID,
isClosed: false,
@ -53,13 +54,11 @@ func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHan
}
//将新创建的Conn添加到链接管理中
c.TcpServer.GetConnMgr().Add(c)
c.TCPServer.GetConnMgr().Add(c)
return c
}
/*
写消息Goroutine 用户将数据发送给客户端
*/
//StartWriter 写消息Goroutine, 用户将数据发送给客户端
func (c *Connection) StartWriter() {
fmt.Println("[Writer Goroutine is running]")
defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
@ -90,9 +89,7 @@ func (c *Connection) StartWriter() {
}
}
/*
读消息Goroutine用于从客户端中读取数据
*/
//StartReader 读消息Goroutine,用于从客户端中读取数据
func (c *Connection) StartReader() {
fmt.Println("[Reader Goroutine is running]")
defer fmt.Println(c.RemoteAddr().String(), "[conn Reader exit!]")
@ -114,7 +111,7 @@ func (c *Connection) StartReader() {
}
//fmt.Printf("read headData %+v\n", headData)
//拆包,得到msgid 和 datalen 放在msg中
//拆包,得到msgID 和 datalen 放在msg中
msg, err := dp.Unpack(headData)
if err != nil {
fmt.Println("unpack error ", err)
@ -149,7 +146,7 @@ func (c *Connection) StartReader() {
}
}
//启动连接,让当前连接开始工作
//Start 启动连接,让当前连接开始工作
func (c *Connection) Start() {
c.ctx, c.cancel = context.WithCancel(context.Background())
//1 开启用户从客户端读取数据流程的Goroutine
@ -157,10 +154,10 @@ func (c *Connection) Start() {
//2 开启用于写回客户端数据流程的Goroutine
go c.StartWriter()
//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
c.TcpServer.CallOnConnStart(c)
c.TCPServer.CallOnConnStart(c)
}
//停止连接,结束当前连接状态M
//Stop 停止连接,结束当前连接状态M
func (c *Connection) Stop() {
fmt.Println("Conn Stop()...ConnID = ", c.ConnID)
@ -168,13 +165,12 @@ func (c *Connection) Stop() {
defer c.Unlock()
//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
c.TcpServer.CallOnConnStop(c)
c.TCPServer.CallOnConnStop(c)
//如果当前链接已经关闭
if c.isClosed == true {
return
}
c.isClosed = true
// 关闭socket链接
c.Conn.Close()
@ -182,29 +178,32 @@ func (c *Connection) Stop() {
c.cancel()
//将链接从连接管理器中删除
c.TcpServer.GetConnMgr().Remove(c)
c.TCPServer.GetConnMgr().Remove(c)
//关闭该链接全部管道
close(c.msgBuffChan)
//设置标志位
c.isClosed = true
}
//从当前连接获取原始的socket TCPConn
//GetTCPConnection 从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
//获取当前连接ID
//GetConnID 获取当前连接ID
func (c *Connection) GetConnID() uint32 {
return c.ConnID
}
//获取远程客户端地址信息
//RemoteAddr 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
//直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
//SendMsg 直接将Message数据发送数据给远程的TCP客户端
func (c *Connection) SendMsg(msgID uint32, data []byte) error {
c.RLock()
if c.isClosed == true {
c.RUnlock()
@ -214,9 +213,9 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error {
//将data封包,并且发送
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgId, data))
msg, err := dp.Pack(NewMsgPackage(msgID, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
fmt.Println("Pack error msg ID = ", msgID)
return errors.New("Pack error msg ")
}
@ -226,7 +225,8 @@ func (c *Connection) SendMsg(msgId uint32, data []byte) error {
return nil
}
func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
//SendBuffMsg 发生BuffMsg
func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error {
c.RLock()
if c.isClosed == true {
c.RUnlock()
@ -236,9 +236,9 @@ func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
//将data封包,并且发送
dp := NewDataPack()
msg, err := dp.Pack(NewMsgPackage(msgId, data))
msg, err := dp.Pack(NewMsgPackage(msgID, data))
if err != nil {
fmt.Println("Pack error msg id = ", msgId)
fmt.Println("Pack error msg ID = ", msgID)
return errors.New("Pack error msg ")
}
@ -248,7 +248,7 @@ func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
return nil
}
//设置链接属性
//SetProperty 设置链接属性
func (c *Connection) SetProperty(key string, value interface{}) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()
@ -256,19 +256,19 @@ func (c *Connection) SetProperty(key string, value interface{}) {
c.property[key] = value
}
//获取链接属性
//GetProperty 获取链接属性
func (c *Connection) GetProperty(key string) (interface{}, error) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()
if value, ok := c.property[key]; ok {
return value, nil
} else {
return nil, errors.New("no property found")
}
return nil, errors.New("no property found")
}
//移除链接属性
//RemoveProperty 移除链接属性
func (c *Connection) RemoveProperty(key string) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()

42
znet/connmanager.go

@ -8,24 +8,20 @@ import (
"github.com/aceld/zinx/ziface"
)
/*
连接管理模块
*/
//ConnManager 连接管理模块
type ConnManager struct {
connections map[uint32]ziface.IConnection //管理的连接信息
connLock sync.RWMutex //读写连接的读写锁
}
/*
创建一个链接管理
*/
//NewConnManager 创建一个链接管理
func NewConnManager() *ConnManager {
return &ConnManager{
connections: make(map[uint32]ziface.IConnection),
}
}
//添加链接
//Add 添加链接
func (connMgr *ConnManager) Add(conn ziface.IConnection) {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
@ -37,7 +33,7 @@ func (connMgr *ConnManager) Add(conn ziface.IConnection) {
fmt.Println("connection add to ConnManager successfully: conn num = ", connMgr.Len())
}
//删除连接
//Remove 删除连接
func (connMgr *ConnManager) Remove(conn ziface.IConnection) {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
@ -49,7 +45,7 @@ func (connMgr *ConnManager) Remove(conn ziface.IConnection) {
fmt.Println("connection Remove ConnID=", conn.GetConnID(), " successfully: conn num = ", connMgr.Len())
}
//利用ConnID获取链接
//Get 利用ConnID获取链接
func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {
//保护共享资源Map 加读锁
connMgr.connLock.RLock()
@ -57,17 +53,18 @@ func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) {
if conn, ok := connMgr.connections[connID]; ok {
return conn, nil
} else {
return nil, errors.New("connection not found")
}
return nil, errors.New("connection not found")
}
//获取当前连接
//Len 获取当前连接
func (connMgr *ConnManager) Len() int {
return len(connMgr.connections)
}
//清除并停止所有连接
//ClearConn 清除并停止所有连接
func (connMgr *ConnManager) ClearConn() {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
@ -83,3 +80,22 @@ func (connMgr *ConnManager) ClearConn() {
fmt.Println("Clear All Connections successfully: conn num = ", connMgr.Len())
}
//ClearOneConn 利用ConnID获取一个链接 并且删除
func (connMgr *ConnManager) ClearOneConn(connID uint32) {
//保护共享资源Map 加写锁
connMgr.connLock.Lock()
defer connMgr.connLock.Unlock()
if conn, ok := connMgr.connections[connID]; !ok {
//停止
conn.Stop()
//删除
delete(connMgr.connections, connID)
fmt.Println("Clear Connections ID: ", connID, "succeed")
return
}
fmt.Println("Clear Connections ID: ", connID, "err")
return
}

17
znet/datapack.go

@ -4,25 +4,26 @@ import (
"bytes"
"encoding/binary"
"errors"
"github.com/aceld/zinx/utils"
"github.com/aceld/zinx/ziface"
)
//封包拆包类实例,暂时不需要成员
//DataPack 封包拆包类实例,暂时不需要成员
type DataPack struct{}
//封包拆包实例初始化方法
//NewDataPack 封包拆包实例初始化方法
func NewDataPack() *DataPack {
return &DataPack{}
}
//获取包头长度方法
//GetHeadLen 获取包头长度方法
func (dp *DataPack) GetHeadLen() uint32 {
//Id uint32(4字节) + DataLen uint32(4字节)
//ID uint32(4字节) + DataLen uint32(4字节)
return 8
}
//封包方法(压缩数据)
//Pack 封包方法(压缩数据)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
//创建一个存放bytes字节的缓冲
dataBuff := bytes.NewBuffer([]byte{})
@ -33,7 +34,7 @@ func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
}
//写msgID
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil {
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgID()); err != nil {
return nil, err
}
@ -45,7 +46,7 @@ func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
return dataBuff.Bytes(), nil
}
//拆包方法(解压数据)
//Unpack 拆包方法(解压数据)
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
//创建一个从输入二进制数据的ioReader
dataBuff := bytes.NewReader(binaryData)
@ -59,7 +60,7 @@ func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
}
//读msgID
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil {
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.ID); err != nil {
return nil, err
}

6
znet/datapack_test.go

@ -58,7 +58,7 @@ func TestDataPack(t *testing.T) {
return
}
fmt.Println("==> Recv Msg: ID=", msg.Id, ", len=", msg.DataLen, ", data=", string(msg.Data))
fmt.Println("==> Recv Msg: ID=", msg.ID, ", len=", msg.DataLen, ", data=", string(msg.Data))
}
}
}(conn)
@ -78,7 +78,7 @@ func TestDataPack(t *testing.T) {
//封装一个msg1包
msg1 := &Message{
Id: 0,
ID: 0,
DataLen: 5,
Data: []byte{'h', 'e', 'l', 'l', 'o'},
}
@ -90,7 +90,7 @@ func TestDataPack(t *testing.T) {
}
msg2 := &Message{
Id: 1,
ID: 1,
DataLen: 7,
Data: []byte{'w', 'o', 'r', 'l', 'd', '!', '!'},
}

29
znet/message.go

@ -1,46 +1,47 @@
package znet
//Message 消息
type Message struct {
DataLen uint32 //消息的长度
Id uint32 //消息的ID
ID uint32 //消息的ID
Data []byte //消息的内容
}
//创建一个Message消息包
func NewMsgPackage(id uint32, data []byte) *Message {
//NewMsgPackage 创建一个Message消息包
func NewMsgPackage(ID uint32, data []byte) *Message {
return &Message{
DataLen: uint32(len(data)),
Id: id,
ID: ID,
Data: data,
}
}
//获取消息数据段长度
//GetDataLen 获取消息数据段长度
func (msg *Message) GetDataLen() uint32 {
return msg.DataLen
}
//获取消息ID
func (msg *Message) GetMsgId() uint32 {
return msg.Id
//GetMsgID 获取消息ID
func (msg *Message) GetMsgID() uint32 {
return msg.ID
}
//获取消息内容
//GetData 获取消息内容
func (msg *Message) GetData() []byte {
return msg.Data
}
//设置消息数据段长度
//SetDataLen 设置消息数据段长度
func (msg *Message) SetDataLen(len uint32) {
msg.DataLen = len
}
//设计消息ID
func (msg *Message) SetMsgId(msgId uint32) {
msg.Id = msgId
//SetMsgID 设计消息ID
func (msg *Message) SetMsgID(msgID uint32) {
msg.ID = msgID
}
//设计消息内容
//SetData 设计消息内容
func (msg *Message) SetData(data []byte) {
msg.Data = data
}

29
znet/msghandler.go

@ -2,17 +2,20 @@ package znet
import (
"fmt"
"strconv"
"github.com/aceld/zinx/utils"
"github.com/aceld/zinx/ziface"
"strconv"
)
// MsgHandle -
type MsgHandle struct {
Apis map[uint32]ziface.IRouter //存放每个MsgId 所对应的处理方法的map属性
Apis map[uint32]ziface.IRouter //存放每个MsgID 所对应的处理方法的map属性
WorkerPoolSize uint32 //业务工作Worker池的数量
TaskQueue []chan ziface.IRequest //Worker负责取任务的消息队列
}
//NewMsgHandle 创建MsgHandle
func NewMsgHandle() *MsgHandle {
return &MsgHandle{
Apis: make(map[uint32]ziface.IRouter),
@ -22,7 +25,7 @@ func NewMsgHandle() *MsgHandle {
}
}
//将消息交给TaskQueue,由worker进行处理
//SendMsgToTaskQueue 将消息交给TaskQueue,由worker进行处理
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
//根据ConnID来分配当前的连接应该由哪个worker负责处理
//轮询的平均分配法则
@ -34,11 +37,11 @@ func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
mh.TaskQueue[workerID] <- request
}
//马上以非阻塞方式处理消息
//DoMsgHandler 马上以非阻塞方式处理消息
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
handler, ok := mh.Apis[request.GetMsgID()]
if !ok {
fmt.Println("api msgId = ", request.GetMsgID(), " is not FOUND!")
fmt.Println("api msgID = ", request.GetMsgID(), " is not FOUND!")
return
}
@ -48,18 +51,18 @@ func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
handler.PostHandle(request)
}
//为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgId uint32, router ziface.IRouter) {
//AddRouter 为消息添加具体的处理逻辑
func (mh *MsgHandle) AddRouter(msgID uint32, router ziface.IRouter) {
//1 判断当前msg绑定的API处理方法是否已经存在
if _, ok := mh.Apis[msgId]; ok {
panic("repeated api , msgId = " + strconv.Itoa(int(msgId)))
if _, ok := mh.Apis[msgID]; ok {
panic("repeated api , msgID = " + strconv.Itoa(int(msgID)))
}
//2 添加msg与api的绑定关系
mh.Apis[msgId] = router
fmt.Println("Add api msgId = ", msgId)
mh.Apis[msgID] = router
fmt.Println("Add api msgID = ", msgID)
}
//启动一个Worker工作流程
//StartOneWorker 启动一个Worker工作流程
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
fmt.Println("Worker ID = ", workerID, " is started.")
//不断的等待队列中的消息
@ -72,7 +75,7 @@ func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest
}
}
//启动worker工作池
//StartWorkerPool 启动worker工作池
func (mh *MsgHandle) StartWorkerPool() {
//遍历需要启动worker的数量,依此启动
for i := 0; i < int(mh.WorkerPoolSize); i++ {

9
znet/request.go

@ -2,22 +2,23 @@ package znet
import "github.com/aceld/zinx/ziface"
//Request 请求
type Request struct {
conn ziface.IConnection //已经和客户端建立好的 链接
msg ziface.IMessage //客户端请求的数据
}
//获取请求连接信息
//GetConnection 获取请求连接信息
func (r *Request) GetConnection() ziface.IConnection {
return r.conn
}
//获取请求消息的数据
//GetData 获取请求消息的数据
func (r *Request) GetData() []byte {
return r.msg.GetData()
}
//获取请求的消息的ID
//GetMsgID 获取请求的消息的ID
func (r *Request) GetMsgID() uint32 {
return r.msg.GetMsgId()
return r.msg.GetMsgID()
}

12
znet/router.go

@ -2,12 +2,18 @@ package znet
import "github.com/aceld/zinx/ziface"
//实现router时,先嵌入这个基类,然后根据需要对这个基类的方法进行重写
//BaseRouter 实现router时,先嵌入这个基类,然后根据需要对这个基类的方法进行重写
type BaseRouter struct{}
//这里之所以BaseRouter的方法都为空,
// 是因为有的Router不希望有PreHandle或PostHandle
// 所以Router全部继承BaseRouter的好处是,不需要实现PreHandle和PostHandle也可以实例化
func (br *BaseRouter) PreHandle(req ziface.IRequest) {}
func (br *BaseRouter) Handle(req ziface.IRequest) {}
//PreHandle -
func (br *BaseRouter) PreHandle(req ziface.IRequest) {}
//Handle -
func (br *BaseRouter) Handle(req ziface.IRequest) {}
//PostHandle -
func (br *BaseRouter) PostHandle(req ziface.IRequest) {}

42
znet/server.go

@ -2,9 +2,10 @@ package znet
import (
"fmt"
"net"
"github.com/aceld/zinx/utils"
"github.com/aceld/zinx/ziface"
"net"
)
var zinxLogo = `
@ -20,7 +21,7 @@ var topLine = `┌────────────────────
var borderLine = ``
var bottomLine = `└───────────────────────────────────────────────────┘`
//iServer 接口实现,定义一个Server服务类
//Server 接口实现,定义一个Server服务类
type Server struct {
//服务器的名称
Name string
@ -30,7 +31,7 @@ type Server struct {
IP string
//服务绑定的端口
Port int
//当前Server的消息管理模块,用来绑定MsgId和对应的处理方法
//当前Server的消息管理模块,用来绑定MsgID和对应的处理方法
msgHandler ziface.IMsgHandle
//当前Server的链接管理器
ConnMgr ziface.IConnManager
@ -40,9 +41,7 @@ type Server struct {
OnConnStop func(conn ziface.IConnection)
}
/*
创建一个服务器句柄
*/
//NewServer 创建一个服务器句柄
func NewServer() ziface.IServer {
printLogo()
@ -59,7 +58,7 @@ func NewServer() ziface.IServer {
//============== 实现 ziface.IServer 里的全部接口方法 ========
//开启网络服务
//Start 开启网络服务
func (s *Server) Start() {
fmt.Printf("[START] Server name: %s,listenner at IP: %s, Port %d is starting\n", s.Name, s.IP, s.Port)
@ -86,8 +85,8 @@ func (s *Server) Start() {
fmt.Println("start Zinx server ", s.Name, " succ, now listenning...")
//TODO server.go 应该有一个自动生成ID的方法
var cid uint32
cid = 0
var cID uint32
cID = 0
//3 启动server网络连接业务
for {
@ -106,8 +105,8 @@ func (s *Server) Start() {
}
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
dealConn := NewConntion(s, conn, cid, s.msgHandler)
cid++
dealConn := NewConntion(s, conn, cID, s.msgHandler)
cID++
//3.4 启动当前链接的处理业务
go dealConn.Start()
@ -115,7 +114,7 @@ func (s *Server) Start() {
}()
}
//停止服务
//Stop 停止服务
func (s *Server) Stop() {
fmt.Println("[STOP] Zinx server , name ", s.Name)
@ -123,7 +122,7 @@ func (s *Server) Stop() {
s.ConnMgr.ClearConn()
}
//运行服务
//Serve 运行服务
func (s *Server) Serve() {
s.Start()
@ -133,27 +132,27 @@ func (s *Server) Serve() {
select {}
}
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) {
s.msgHandler.AddRouter(msgId, router)
//AddRouter 路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
func (s *Server) AddRouter(msgID uint32, router ziface.IRouter) {
s.msgHandler.AddRouter(msgID, router)
}
//得到链接管理
//GetConnMgr 得到链接管理
func (s *Server) GetConnMgr() ziface.IConnManager {
return s.ConnMgr
}
//设置该Server的连接创建时Hook函数
//SetOnConnStart 设置该Server的连接创建时Hook函数
func (s *Server) SetOnConnStart(hookFunc func(ziface.IConnection)) {
s.OnConnStart = hookFunc
}
//设置该Server的连接断开时的Hook函数
//SetOnConnStop 设置该Server的连接断开时的Hook函数
func (s *Server) SetOnConnStop(hookFunc func(ziface.IConnection)) {
s.OnConnStop = hookFunc
}
//调用连接OnConnStart Hook函数
//CallOnConnStart 调用连接OnConnStart Hook函数
func (s *Server) CallOnConnStart(conn ziface.IConnection) {
if s.OnConnStart != nil {
fmt.Println("---> CallOnConnStart....")
@ -161,7 +160,7 @@ func (s *Server) CallOnConnStart(conn ziface.IConnection) {
}
}
//调用连接OnConnStop Hook函数
//CallOnConnStop 调用连接OnConnStop Hook函数
func (s *Server) CallOnConnStop(conn ziface.IConnection) {
if s.OnConnStop != nil {
fmt.Println("---> CallOnConnStop....")
@ -169,7 +168,6 @@ func (s *Server) CallOnConnStop(conn ziface.IConnection) {
}
}
func printLogo() {
fmt.Println(zinxLogo)
fmt.Println(topLine)

9
znet/server_test.go

@ -2,11 +2,12 @@ package znet
import (
"fmt"
"github.com/aceld/zinx/ziface"
"io"
"net"
"testing"
"time"
"github.com/aceld/zinx/ziface"
)
// run in terminal:
@ -63,7 +64,7 @@ func ClientTest(i uint32) {
return
}
fmt.Printf("==> Client receive Msg: Id = %d, len = %d , data = %s\n", msg.Id, msg.DataLen, msg.Data)
fmt.Printf("==> Client receive Msg: ID = %d, len = %d , data = %s\n", msg.ID, msg.DataLen, msg.Data)
}
time.Sleep(time.Second)
@ -92,7 +93,7 @@ func (this *PingRouter) PreHandle(request ziface.IRequest) {
func (this *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle")
//先读取客户端的数据,再回写ping...ping...ping
fmt.Println("recv from client : msgId=", request.GetMsgID(), ", data=", string(request.GetData()))
fmt.Println("recv from client : msgID=", request.GetMsgID(), ", data=", string(request.GetData()))
err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping\n"))
if err != nil {
@ -115,7 +116,7 @@ type HelloRouter struct {
func (this *HelloRouter) Handle(request ziface.IRequest) {
fmt.Println("call helloRouter Handle")
fmt.Printf("receive from client msgId=%d, data=%s\n", request.GetMsgID(), string(request.GetData()))
fmt.Printf("receive from client msgID=%d, data=%s\n", request.GetMsgID(), string(request.GetData()))
err := request.GetConnection().SendMsg(2, []byte("hello zix hello Router"))
if err != nil {

46
ztimer/timerscheduler.go

@ -29,13 +29,13 @@ type TimerScheduler struct {
//当前调度器的最高级时间轮
tw *TimeWheel
//定时器编号累加器
idGen uint32
IDGen uint32
//已经触发定时器的channel
triggerChan chan *DelayFunc
//互斥锁
sync.RWMutex
//所有注册的timerId集合
ids []uint32
//所有注册的timerID集合
IDs []uint32
}
// NewTimerScheduler 返回一个定时器调度器 ,主要创建分层定时器,并做关联,并依次启动
@ -60,45 +60,45 @@ func NewTimerScheduler() *TimerScheduler {
return &TimerScheduler{
tw: hourTw,
triggerChan: make(chan *DelayFunc, MaxChanBuff),
ids: make([]uint32, 0),
IDs: make([]uint32, 0),
}
}
//CreateTimerAt 创建一个定点Timer 并将Timer添加到分层时间轮中, 返回Timer的tid
//CreateTimerAt 创建一个定点Timer 并将Timer添加到分层时间轮中, 返回Timer的tID
func (ts *TimerScheduler) CreateTimerAt(df *DelayFunc, unixNano int64) (uint32, error) {
ts.Lock()
defer ts.Unlock()
ts.idGen++
ts.ids = append(ts.ids, ts.idGen)
return ts.idGen, ts.tw.AddTimer(ts.idGen, NewTimerAt(df, unixNano))
ts.IDGen++
ts.IDs = append(ts.IDs, ts.IDGen)
return ts.IDGen, ts.tw.AddTimer(ts.IDGen, NewTimerAt(df, unixNano))
}
//CreateTimerAfter 创建一个延迟Timer 并将Timer添加到分层时间轮中, 返回Timer的tid
//CreateTimerAfter 创建一个延迟Timer 并将Timer添加到分层时间轮中, 返回Timer的tID
func (ts *TimerScheduler) CreateTimerAfter(df *DelayFunc, duration time.Duration) (uint32, error) {
ts.Lock()
defer ts.Unlock()
ts.idGen++
ts.ids = append(ts.ids, ts.idGen)
return ts.idGen, ts.tw.AddTimer(ts.idGen, NewTimerAfter(df, duration))
ts.IDGen++
ts.IDs = append(ts.IDs, ts.IDGen)
return ts.IDGen, ts.tw.AddTimer(ts.IDGen, NewTimerAfter(df, duration))
}
//CancelTimer 删除timer
func (ts *TimerScheduler) CancelTimer(tid uint32) {
func (ts *TimerScheduler) CancelTimer(tID uint32) {
ts.Lock()
ts.Unlock()
//ts.tw.RemoveTimer(tid) 这个方法无效
//删除timerId
//ts.tw.RemoveTimer(tID) 这个方法无效
//删除timerID
var index = -1
for i := 0; i < len(ts.ids); i++ {
if ts.ids[i] == tid {
for i := 0; i < len(ts.IDs); i++ {
if ts.IDs[i] == tID {
index = i
}
}
if index > -1 {
ts.ids = append(ts.ids[:index], ts.ids[index+1:]...)
ts.IDs = append(ts.IDs[:index], ts.IDs[index+1:]...)
}
}
@ -108,9 +108,9 @@ func (ts *TimerScheduler) GetTriggerChan() chan *DelayFunc {
}
// HasTimer 是否有时间轮
func (ts *TimerScheduler) HasTimer(tid uint32) bool {
for i := 0; i < len(ts.ids); i++ {
if ts.ids[i] == tid {
func (ts *TimerScheduler) HasTimer(tID uint32) bool {
for i := 0; i < len(ts.IDs); i++ {
if ts.IDs[i] == tID {
return true
}
}
@ -125,12 +125,12 @@ func (ts *TimerScheduler) Start() {
now := UnixMilli()
//获取最近MaxTimeDelay 毫秒的超时定时器集合
timerList := ts.tw.GetTimerWithIn(MaxTimeDelay * time.Millisecond)
for tid, timer := range timerList {
for tID, timer := range timerList {
if math.Abs(float64(now-timer.unixts)) > MaxTimeDelay {
//已经超时的定时器,报警
zlog.Error("want call at ", timer.unixts, "; real call at", now, "; delay ", now-timer.unixts)
}
if ts.HasTimer(tid) {
if ts.HasTimer(tID) {
//将超时触发函数写入管道
ts.triggerChan <- timer.delayFunc
}

12
ztimer/timerscheduler_test.go

@ -30,9 +30,9 @@ func TestNewTimerScheduler(t *testing.T) {
//在scheduler中添加timer
for i := 1; i < 2000; i++ {
f := NewDelayFunc(foo, []interface{}{i, i * 3})
tid, err := timerScheduler.CreateTimerAfter(f, time.Duration(3*i)*time.Millisecond)
tID, err := timerScheduler.CreateTimerAfter(f, time.Duration(3*i)*time.Millisecond)
if err != nil {
zlog.Error("create timer error", tid, err)
zlog.Error("create timer error", tID, err)
break
}
}
@ -56,9 +56,9 @@ func TestNewAutoExecTimerScheduler(t *testing.T) {
//给调度器添加Timer
for i := 0; i < 2000; i++ {
f := NewDelayFunc(foo, []interface{}{i, i * 3})
tid, err := autoTS.CreateTimerAfter(f, time.Duration(3*i)*time.Millisecond)
tID, err := autoTS.CreateTimerAfter(f, time.Duration(3*i)*time.Millisecond)
if err != nil {
zlog.Error("create timer error", tid, err)
zlog.Error("create timer error", tID, err)
break
}
}
@ -80,8 +80,8 @@ func TestCancelTimerScheduler(t *testing.T) {
if nil != err {
t.Log("Scheduler.CreateTimerAfter(f1, time.Duration(3)*time.Second)", "err:", err)
}
log.Printf("timerId1=%d ,timerId2=%d\n", timerID1, timerID2)
Scheduler.CancelTimer(timerID1) //删除timerId1
log.Printf("timerID1=%d ,timerID2=%d\n", timerID1, timerID2)
Scheduler.CancelTimer(timerID1) //删除timerID1
//阻塞等待
select {}

40
ztimer/timewheel.go

@ -40,7 +40,7 @@ type TimeWheel struct {
maxCap int
//当前时间轮上的所有timer
timerQueue map[int]map[uint32]*Timer //map[int] VALUE 其中int表示当前时间轮的刻度,
// map[int] map[uint32] *Timer, uint32表示Timer的id
// map[int] map[uint32] *Timer, uint32表示Timer的ID
//下一层时间轮
nextTimeWheel *TimeWheel
//互斥锁(继承RWMutex的 RWLock,UnLock 等方法)
@ -72,7 +72,7 @@ func NewTimeWheel(name string, interval int64, scales int, maxCap int) *TimeWhee
/*
将一个timer定时器加入到分层时间轮中
tid: 每个定时器timer的唯一标识
tID: 每个定时器timer的唯一标识
t: 当前被加入时间轮的定时器
forceNext: 是否强制的将定时器添加到下一层时间轮
@ -81,7 +81,7 @@ func NewTimeWheel(name string, interval int64, scales int, maxCap int) *TimeWhee
如果当前的timer的超时时间间隔 小于一个刻度 :
如果没有下一轮时间轮
*/
func (tw *TimeWheel) addTimer(tid uint32, t *Timer, forceNext bool) error {
func (tw *TimeWheel) addTimer(tID uint32, t *Timer, forceNext bool) error {
defer func() error {
if err := recover(); err != nil {
errstr := fmt.Sprintf("addTimer function err : %s", err)
@ -99,7 +99,7 @@ func (tw *TimeWheel) addTimer(tid uint32, t *Timer, forceNext bool) error {
//得到需要跨越几个刻度
dn := delayInterval / tw.interval
//在对应的刻度上的定时器Timer集合map加入当前定时器(由于是环形,所以要求余)
tw.timerQueue[(tw.curIndex+int(dn))%tw.scales][tid] = t
tw.timerQueue[(tw.curIndex+int(dn))%tw.scales][tID] = t
return nil
}
@ -112,38 +112,38 @@ func (tw *TimeWheel) addTimer(tid uint32, t *Timer, forceNext bool) error {
//因为这是底层时间轮,该定时器在转动的时候,如果没有被调度者取走的话,该定时器将不会再被发现
//因为时间轮刻度已经过去,如果不强制把该定时器Timer移至下时刻,就永远不会被取走并触发调用
//所以这里强制将timer移至下个刻度的集合中,等待调用者在下次轮转之前取走该定时器
tw.timerQueue[(tw.curIndex+1)%tw.scales][tid] = t
tw.timerQueue[(tw.curIndex+1)%tw.scales][tID] = t
} else {
//如果手动添加定时器,那么直接将timer添加到对应底层时间轮的当前刻度集合中
tw.timerQueue[tw.curIndex][tid] = t
tw.timerQueue[tw.curIndex][tID] = t
}
return nil
}
//如果当前的超时时间,小于一个刻度的时间间隔,并且有下一层时间轮
if delayInterval < tw.interval {
return tw.nextTimeWheel.AddTimer(tid, t)
return tw.nextTimeWheel.AddTimer(tID, t)
}
return nil
}
//AddTimer 添加一个timer到一个时间轮中(非时间轮自转情况)
func (tw *TimeWheel) AddTimer(tid uint32, t *Timer) error {
func (tw *TimeWheel) AddTimer(tID uint32, t *Timer) error {
tw.Lock()
defer tw.Unlock()
return tw.addTimer(tid, t, false)
return tw.addTimer(tID, t, false)
}
//RemoveTimer 删除一个定时器,根据定时器的id
func (tw *TimeWheel) RemoveTimer(tid uint32) {
//RemoveTimer 删除一个定时器,根据定时器的ID
func (tw *TimeWheel) RemoveTimer(tID uint32) {
tw.Lock()
defer tw.Unlock()
for i := 0; i < tw.scales; i++ {
if _, ok := tw.timerQueue[i][tid]; ok {
delete(tw.timerQueue[i], tid)
if _, ok := tw.timerQueue[i][tID]; ok {
delete(tw.timerQueue[i], tID)
}
}
}
@ -167,16 +167,16 @@ func (tw *TimeWheel) run() {
curTimers := tw.timerQueue[tw.curIndex]
//当前定时器要重新添加 所给当前刻度再重新开辟一个map Timer容器
tw.timerQueue[tw.curIndex] = make(map[uint32]*Timer, tw.maxCap)
for tid, timer := range curTimers {
for tID, timer := range curTimers {
//这里属于时间轮自动转动,forceNext设置为true
tw.addTimer(tid, timer, true)
tw.addTimer(tID, timer, true)
}
//取出下一个刻度 挂载的全部定时器 进行重新添加 (为了安全起见,待考慮)
nextTimers := tw.timerQueue[(tw.curIndex+1)%tw.scales]
tw.timerQueue[(tw.curIndex+1)%tw.scales] = make(map[uint32]*Timer, tw.maxCap)
for tid, timer := range nextTimers {
tw.addTimer(tid, timer, true)
for tID, timer := range nextTimers {
tw.addTimer(tID, timer, true)
}
//当前刻度指针 走一格
@ -209,12 +209,12 @@ func (tw *TimeWheel) GetTimerWithIn(duration time.Duration) map[uint32]*Timer {
now := UnixMilli()
//取出当前时间轮刻度内全部Timer
for tid, timer := range leaftw.timerQueue[leaftw.curIndex] {
for tID, timer := range leaftw.timerQueue[leaftw.curIndex] {
if timer.unixts-now < int64(duration/1e6) {
//当前定时器已经超时
timerList[tid] = timer
timerList[tID] = timer
//定时器已经超时被取走,从当前时间轮上 摘除该定时器
delete(leaftw.timerQueue[leaftw.curIndex], tid)
delete(leaftw.timerQueue[leaftw.curIndex], tID)
}
}

Loading…
Cancel
Save