Browse Source

更新Zinx-v0.2版本-绑定conn与handle

master
aceld 6 years ago
parent
commit
4d032323a4
  1. 25
      ziface/iconnnection.go
  2. 131
      znet/connection.go
  3. 39
      znet/server.go

25
ziface/iconnnection.go

@ -0,0 +1,25 @@
package ziface
import "net"
//定义连接接口
type IConnection interface {
//启动连接,让当前连接开始工作
Start()
//停止连接,结束当前连接状态M
Stop()
//从当前连接获取原始的socket TCPConn
GetTCPConnection() *net.TCPConn
//获取当前连接ID
GetConnID() uint32
//获取远程客户端地址信息
RemoteAddr() net.Addr
//直接将数据发送数据给远程的TCP客户端
Send(data []byte) error
//将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端
SendBuff(data []byte) error
}
//定义一个统一处理链接业务的接口
type HandFunc func(*net.TCPConn, []byte, int) error

131
znet/connection.go

@ -0,0 +1,131 @@
package znet
import (
"fmt"
"net"
"zinx/ziface"
)
type Connection struct {
//当前连接的socket TCP套接字
Conn *net.TCPConn
//当前连接的ID 也可以称作为SessionID,ID全局唯一
ConnID uint32
//当前连接的关闭状态
isClosed bool
//该连接的处理方法api
handleAPI ziface.HandFunc
//告知该链接已经退出/停止的channel
ExitBuffChan chan bool
//给缓冲队列发送数据的channel,
// 如果向缓冲队列发送数据,那么把数据发送到这个channel下
// SendBuffChan chan []byte
}
//创建连接的方法
func NewConntion(conn *net.TCPConn, connID uint32, callback_api ziface.HandFunc) *Connection{
c := &Connection{
Conn: conn,
ConnID: connID,
isClosed: false,
handleAPI: callback_api,
ExitBuffChan: make(chan bool, 1),
// SendBuffChan: make(chan []byte, 512),
}
return c
}
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
for {
//读取我们最大的数据到buf中
buf := make([]byte, 512)
cnt, err := c.Conn.Read(buf)
if err != nil {
fmt.Println("recv buf err ", err)
c.ExitBuffChan <- true
continue
}
//调用当前链接业务
if err := c.handleAPI(c.Conn, buf, cnt); err !=nil {
fmt.Println("connID ", c.ConnID, " handle is error")
c.ExitBuffChan <- true
return
}
}
}
//启动连接,让当前连接开始工作
func (c *Connection) Start() {
//开启处理该链接读取到客户端数据之后的请求业务
go c.StartReader()
for {
select {
case <- c.ExitBuffChan:
//得到退出消息,不再阻塞
return
}
}
//1 开启用于写回客户端数据流程的Goroutine
//2 开启用户从客户端读取数据流程的Goroutine
}
//停止连接,结束当前连接状态M
func (c *Connection) Stop() {
//1. 如果当前链接已经关闭
if c.isClosed == true {
return
}
c.isClosed = true
//TODO Connection Stop() 如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
// 关闭socket链接
c.Conn.Close()
//通知从缓冲队列读数据的业务,该链接已经关闭
c.ExitBuffChan <- true
//关闭该链接全部管道
//close(c.ExitBuffChan)
//close(c.SendBuffChan)
}
//从当前连接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
//获取当前连接ID
func (c *Connection) GetConnID() uint32{
return c.ConnID
}
//获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
//直接将数据发送数据给远程的TCP客户端
func (c *Connection) Send(data []byte) error {
return nil
}
//将数据发送给缓冲队列,通过专门从缓冲队列读数据的go写给客户端
func (c *Connection) SendBuff(data []byte) error {
return nil
}

39
znet/server.go

@ -1,6 +1,7 @@
package znet package znet
import ( import (
"errors"
"fmt" "fmt"
"net" "net"
"time" "time"
@ -19,6 +20,16 @@ type Server struct {
Port int Port int
} }
//============== 定义当前客户端链接的handle api ===========
func CallBackToClient(conn *net.TCPConn, data []byte, cnt int) error {
//回显业务
fmt.Println("[Conn Handle] CallBackToClient ... ")
if _, err := conn.Write(data[:cnt]); err !=nil {
fmt.Println("write back buf err ", err)
return errors.New("CallBackToClient error")
}
return nil
}
//============== 实现 ziface.IServer 里的全部接口方法 ======== //============== 实现 ziface.IServer 里的全部接口方法 ========
@ -45,6 +56,10 @@ func (s *Server) Start() {
//已经监听成功 //已经监听成功
fmt.Println("start Zinx server ", s.Name, " succ, now listenning...") fmt.Println("start Zinx server ", s.Name, " succ, now listenning...")
//TODO server.go 应该有一个自动生成ID的方法
var cid uint32
cid = 0
//3 启动server网络连接业务 //3 启动server网络连接业务
for { for {
//3.1 阻塞等待客户端建立连接请求 //3.1 阻塞等待客户端建立连接请求
@ -56,25 +71,12 @@ func (s *Server) Start() {
//3.2 TODO Server.Start() 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接 //3.2 TODO Server.Start() 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
//3.3 TODO Server.Start() 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
dealConn := NewConntion(conn, cid, CallBackToClient)
cid ++
//我们这里暂时做一个最大512字节的回显服务
go func () {
//不断的循环从客户端获取数据
for {
buf := make([]byte, 512)
cnt, err := conn.Read(buf)
if err != nil {
fmt.Println("recv buf err ", err)
continue
}
//回显
if _, err := conn.Write(buf[:cnt]); err !=nil {
fmt.Println("write back buf err ", err)
continue
}
}
}()
//3.4 启动当前链接的处理业务
go dealConn.Start()
} }
}() }()
} }
@ -90,7 +92,6 @@ func (s *Server) Serve() {
//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加 //TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加
//阻塞,否则主Go退出, listenner的go将会退出 //阻塞,否则主Go退出, listenner的go将会退出
for { for {
time.Sleep(10*time.Second) time.Sleep(10*time.Second)

Loading…
Cancel
Save