diff --git a/znet/connmanager.go b/znet/connmanager.go index c395b98..842aabc 100644 --- a/znet/connmanager.go +++ b/znet/connmanager.go @@ -3,8 +3,9 @@ package znet import ( "errors" "fmt" - "github.com/aceld/zinx/ziface" "sync" + + "github.com/aceld/zinx/ziface" ) /* diff --git a/ztimer/delayfunc.go b/ztimer/delayfunc.go index e83109f..da1499e 100644 --- a/ztimer/delayfunc.go +++ b/ztimer/delayfunc.go @@ -1,14 +1,15 @@ +package ztimer + /** * @Author: Aceld * @Date: 2019/4/30 11:57 * @Mail: danbing.at@gmail.com */ -package ztimer - import ( "fmt" - "github.com/aceld/zinx/zlog" "reflect" + + "github.com/aceld/zinx/zlog" ) /* @@ -16,14 +17,14 @@ import ( 延迟调用函数就是 时间定时器超时的时候,触发的事先注册好的 回调函数 */ + +//DelayFunc 延迟调用函数对象 type DelayFunc struct { f func(...interface{}) //f : 延迟函数调用原型 args []interface{} //args: 延迟调用函数传递的形参 } -/* - 创建一个延迟调用函数 -*/ +//NewDelayFunc 创建一个延迟调用函数 func NewDelayFunc(f func(v ...interface{}), args []interface{}) *DelayFunc { return &DelayFunc{ f: f, @@ -31,14 +32,12 @@ func NewDelayFunc(f func(v ...interface{}), args []interface{}) *DelayFunc { } } -//打印当前延迟函数的信息,用于日志记录 +//String 打印当前延迟函数的信息,用于日志记录 func (df *DelayFunc) String() string { return fmt.Sprintf("{DelayFun:%s, args:%v}", reflect.TypeOf(df.f).Name(), df.args) } -/* - 执行延迟函数---如果执行失败,抛出异常 -*/ +//Call 执行延迟函数---如果执行失败,抛出异常 func (df *DelayFunc) Call() { defer func() { if err := recover(); err != nil { diff --git a/ztimer/timer.go b/ztimer/timer.go index 2e08e33..c047744 100644 --- a/ztimer/timer.go +++ b/ztimer/timer.go @@ -1,28 +1,38 @@ +package ztimer + /** * @Author: Aceld * @Date: 2019/4/30 17:42 * @Mail: danbing.at@gmail.com */ -package ztimer import ( "time" ) const ( - HOUR_NAME = "HOUR" - HOUR_INTERVAL = 60 * 60 * 1e3 //ms为精度 - HOUR_SCALES = 12 + //HourName 小时 + HourName = "HOUR" + //HourInterval 小时间隔ms为精度 + HourInterval = 60 * 60 * 1e3 + //HourScales 12小时制 + HourScales = 12 - MINUTE_NAME = "MINUTE" - MINUTE_INTERVAL = 60 * 1e3 - MINUTE_SCALES = 60 + //MinuteName 分钟 + MinuteName = "MINUTE" + //MinuteInterval 每分钟时间间隔 + MinuteInterval = 60 * 1e3 + //MinuteScales 60分钟 + MinuteScales = 60 - SECOND_NAME = "SECOND" - SECOND_INTERVAL = 1e3 - SECOND_SCALES = 60 - - TIMERS_MAX_CAP = 2048 //每个时间轮刻度挂载定时器的最大个数 + //SecondName 秒 + SecondName = "SECOND" + //SecondInterval 秒的间隔 + SecondInterval = 1e3 + //SecondScales 60秒 + SecondScales = 60 + //TimersMaxCap //每个时间轮刻度挂载定时器的最大个数 + TimersMaxCap = 2048 ) /* @@ -35,9 +45,7 @@ const ( time.Now().UnixNano() ==> time.Nanosecond (纳秒) */ -/* - 定时器实现 -*/ +//Timer 定时器实现 type Timer struct { //延迟调用函数 delayFunc *DelayFunc @@ -45,16 +53,12 @@ type Timer struct { unixts int64 } -//返回1970-1-1至今经历的毫秒数 +//UnixMilli 返回1970-1-1至今经历的毫秒数 func UnixMilli() int64 { return time.Now().UnixNano() / 1e6 } -/* - 创建一个定时器,在指定的时间触发 定时器方法 - df: DelayFunc类型的延迟调用函数类型 - unixNano: unix计算机从1970-1-1至今经历的纳秒数 -*/ +//NewTimerAt 创建一个定时器,在指定的时间触发 定时器方法 df: DelayFunc类型的延迟调用函数类型;unixNano: unix计算机从1970-1-1至今经历的纳秒数 func NewTimerAt(df *DelayFunc, unixNano int64) *Timer { return &Timer{ delayFunc: df, @@ -62,14 +66,12 @@ func NewTimerAt(df *DelayFunc, unixNano int64) *Timer { } } -/* - 创建一个定时器,在当前时间延迟duration之后触发 定时器方法 -*/ +//NewTimerAfter 创建一个定时器,在当前时间延迟duration之后触发 定时器方法 func NewTimerAfter(df *DelayFunc, duration time.Duration) *Timer { return NewTimerAt(df, time.Now().UnixNano()+int64(duration)) } -//启动定时器,用一个go承载 +//Run 启动定时器,用一个go承载 func (t *Timer) Run() { go func() { now := UnixMilli() diff --git a/ztimer/timerscheduler.go b/ztimer/timerscheduler.go index 9953757..300c4ee 100644 --- a/ztimer/timerscheduler.go +++ b/ztimer/timerscheduler.go @@ -1,3 +1,5 @@ +package ztimer + /** * @Author: Aceld * @Date: 2019/5/8 17:43 @@ -6,22 +8,23 @@ * 时间轮调度器 * 依赖模块,delayfunc.go timer.go timewheel.go */ -package ztimer import ( - "github.com/aceld/zinx/zlog" "math" "sync" "time" + + "github.com/aceld/zinx/zlog" ) const ( - //默认缓冲触发函数队列大小 - MAX_CHAN_BUFF = 2048 - //默认最大误差时间 - MAX_TIME_DELAY = 100 + //MaxChanBuff 默认缓冲触发函数队列大小 + MaxChanBuff = 2048 + //MaxTimeDelay 默认最大误差时间 + MaxTimeDelay = 100 ) +//TimerScheduler 计时器调度器 type TimerScheduler struct { //当前调度器的最高级时间轮 tw *TimeWheel @@ -35,112 +38,109 @@ type TimerScheduler struct { ids []uint32 } -/* - 返回一个定时器调度器 - - 主要创建分层定时器,并做关联,并依次启动 -*/ +// NewTimerScheduler 返回一个定时器调度器 ,主要创建分层定时器,并做关联,并依次启动 func NewTimerScheduler() *TimerScheduler { //创建秒级时间轮 - second_tw := NewTimeWheel(SECOND_NAME, SECOND_INTERVAL, SECOND_SCALES, TIMERS_MAX_CAP) + secondTw := NewTimeWheel(SecondName, SecondInterval, SecondScales, TimersMaxCap) //创建分钟级时间轮 - minute_tw := NewTimeWheel(MINUTE_NAME, MINUTE_INTERVAL, MINUTE_SCALES, TIMERS_MAX_CAP) + minuteTw := NewTimeWheel(MinuteName, MinuteInterval, MinuteScales, TimersMaxCap) //创建小时级时间轮 - hour_tw := NewTimeWheel(HOUR_NAME, HOUR_INTERVAL, HOUR_SCALES, TIMERS_MAX_CAP) + hourTw := NewTimeWheel(HourName, HourInterval, HourScales, TimersMaxCap) //将分层时间轮做关联 - hour_tw.AddTimeWheel(minute_tw) - minute_tw.AddTimeWheel(second_tw) + hourTw.AddTimeWheel(minuteTw) + minuteTw.AddTimeWheel(secondTw) //时间轮运行 - second_tw.Run() - minute_tw.Run() - hour_tw.Run() + secondTw.Run() + minuteTw.Run() + hourTw.Run() return &TimerScheduler{ - tw: hour_tw, - triggerChan: make(chan *DelayFunc, MAX_CHAN_BUFF), + tw: hourTw, + triggerChan: make(chan *DelayFunc, MaxChanBuff), ids: make([]uint32, 0), } } -//创建一个定点Timer 并将Timer添加到分层时间轮中, 返回Timer的tid -func (this *TimerScheduler) CreateTimerAt(df *DelayFunc, unixNano int64) (uint32, error) { - this.Lock() - defer this.Unlock() +//CreateTimerAt 创建一个定点Timer 并将Timer添加到分层时间轮中, 返回Timer的tid +func (ts *TimerScheduler) CreateTimerAt(df *DelayFunc, unixNano int64) (uint32, error) { + ts.Lock() + defer ts.Unlock() - this.idGen++ - this.ids = append(this.ids, this.idGen) - return this.idGen, this.tw.AddTimer(this.idGen, NewTimerAt(df, unixNano)) + ts.idGen++ + ts.ids = append(ts.ids, ts.idGen) + return ts.idGen, ts.tw.AddTimer(ts.idGen, NewTimerAt(df, unixNano)) } -//创建一个延迟Timer 并将Timer添加到分层时间轮中, 返回Timer的tid -func (this *TimerScheduler) CreateTimerAfter(df *DelayFunc, duration time.Duration) (uint32, error) { - this.Lock() - defer this.Unlock() +//CreateTimerAfter 创建一个延迟Timer 并将Timer添加到分层时间轮中, 返回Timer的tid +func (ts *TimerScheduler) CreateTimerAfter(df *DelayFunc, duration time.Duration) (uint32, error) { + ts.Lock() + defer ts.Unlock() - this.idGen++ - this.ids = append(this.ids, this.idGen) - return this.idGen, this.tw.AddTimer(this.idGen, NewTimerAfter(df, duration)) + ts.idGen++ + ts.ids = append(ts.ids, ts.idGen) + return ts.idGen, ts.tw.AddTimer(ts.idGen, NewTimerAfter(df, duration)) } -//删除timer -func (this *TimerScheduler) CancelTimer(tid uint32) { - this.Lock() - this.Unlock() - //this.tw.RemoveTimer(tid) 这个方法无效 +//CancelTimer 删除timer +func (ts *TimerScheduler) CancelTimer(tid uint32) { + ts.Lock() + ts.Unlock() + //ts.tw.RemoveTimer(tid) 这个方法无效 //删除timerId var index = -1 - for i := 0; i < len(this.ids); i++ { - if this.ids[i] == tid { + for i := 0; i < len(ts.ids); i++ { + if ts.ids[i] == tid { index = i } } if index > -1 { - this.ids = append(this.ids[:index], this.ids[index+1:]...) + ts.ids = append(ts.ids[:index], ts.ids[index+1:]...) } } -//获取计时结束的延迟执行函数通道 -func (this *TimerScheduler) GetTriggerChan() chan *DelayFunc { - return this.triggerChan +//GetTriggerChan 获取计时结束的延迟执行函数通道 +func (ts *TimerScheduler) GetTriggerChan() chan *DelayFunc { + return ts.triggerChan } -func (this *TimerScheduler) HasTimer(tid uint32) bool { - for i := 0; i < len(this.ids); i++ { - if this.ids[i] == tid { +// HasTimer 是否有时间轮 +func (ts *TimerScheduler) HasTimer(tid uint32) bool { + for i := 0; i < len(ts.ids); i++ { + if ts.ids[i] == tid { return true } } return false } -//非阻塞的方式启动timerSchedule -func (this *TimerScheduler) Start() { +//Start 非阻塞的方式启动timerSchedule +func (ts *TimerScheduler) Start() { go func() { for { //当前时间 now := UnixMilli() - //获取最近MAX_TIME_DELAY 毫秒的超时定时器集合 - timerList := this.tw.GetTimerWithIn(MAX_TIME_DELAY * time.Millisecond) + //获取最近MaxTimeDelay 毫秒的超时定时器集合 + timerList := ts.tw.GetTimerWithIn(MaxTimeDelay * time.Millisecond) for tid, timer := range timerList { - if math.Abs(float64(now-timer.unixts)) > MAX_TIME_DELAY { + if math.Abs(float64(now-timer.unixts)) > MaxTimeDelay { //已经超时的定时器,报警 zlog.Error("want call at ", timer.unixts, "; real call at", now, "; delay ", now-timer.unixts) } - if this.HasTimer(tid) { + if ts.HasTimer(tid) { //将超时触发函数写入管道 - this.triggerChan <- timer.delayFunc + ts.triggerChan <- timer.delayFunc } } - time.Sleep(MAX_TIME_DELAY / 2 * time.Millisecond) + time.Sleep(MaxTimeDelay / 2 * time.Millisecond) } }() } -//时间轮定时器 自动调度 +//NewAutoExecTimerScheduler 时间轮定时器 自动调度 func NewAutoExecTimerScheduler() *TimerScheduler { //创建一个调度器 autoExecScheduler := NewTimerScheduler() diff --git a/ztimer/timerscheduler_test.go b/ztimer/timerscheduler_test.go index 16c8303..0c1626d 100644 --- a/ztimer/timerscheduler_test.go +++ b/ztimer/timerscheduler_test.go @@ -1,3 +1,5 @@ +package ztimer + /** * @Author: Aceld(刘丹冰) * @Date: 2019/5/9 10:14 @@ -5,14 +7,14 @@ * * 时间轮定时器调度器单元测试 */ -package ztimer import ( "fmt" - "github.com/aceld/zinx/zlog" "log" "testing" "time" + + "github.com/aceld/zinx/zlog" ) //触发函数 @@ -70,10 +72,16 @@ func TestCancelTimerScheduler(t *testing.T) { Scheduler := NewAutoExecTimerScheduler() f1 := NewDelayFunc(foo, []interface{}{3, 3}) f2 := NewDelayFunc(foo, []interface{}{5, 5}) - timerId1, _ := Scheduler.CreateTimerAfter(f1, time.Duration(3)*time.Second) - timerId2, _ := Scheduler.CreateTimerAfter(f2, time.Duration(5)*time.Second) - log.Printf("timerId1=%d ,timerId2=%d\n", timerId1, timerId2) - Scheduler.CancelTimer(timerId1) //删除timerId1 + timerID1, err := Scheduler.CreateTimerAfter(f1, time.Duration(3)*time.Second) + if nil != err { + t.Log("Scheduler.CreateTimerAfter(f1, time.Duration(3)*time.Second)", "err:", err) + } + timerID2, err := Scheduler.CreateTimerAfter(f2, time.Duration(5)*time.Second) + if nil != err { + t.Log("Scheduler.CreateTimerAfter(f1, time.Duration(3)*time.Second)", "err:", err) + } + log.Printf("timerId1=%d ,timerId2=%d\n", timerID1, timerID2) + Scheduler.CancelTimer(timerID1) //删除timerId1 //阻塞等待 select {} diff --git a/ztimer/timewheel.go b/ztimer/timewheel.go index 304dc3f..90092d0 100644 --- a/ztimer/timewheel.go +++ b/ztimer/timewheel.go @@ -1,16 +1,18 @@ +package ztimer + /** * @Author: Aceld * @Date: 2019/4/30 11:57 * @Mail: danbing.at@gmail.com */ -package ztimer import ( "errors" "fmt" - "github.com/aceld/zinx/zlog" "sync" "time" + + "github.com/aceld/zinx/zlog" ) /* @@ -24,6 +26,7 @@ import ( 用时间轮的方式来管理和维护大量的timer调度,会解决上面的问题。 */ +//TimeWheel 时间轮 type TimeWheel struct { //TimeWheel的名称 name string @@ -44,14 +47,13 @@ type TimeWheel struct { sync.RWMutex } -/* - 创建一个时间轮 - name:时间轮的名称 - interval:每个刻度之间的duration时间间隔 - scales:当前时间轮的轮盘一共多少个刻度(如我们正常的时钟就是12个刻度) - maxCap: 每个刻度所最大保存的Timer定时器个数 -*/ +//NewTimeWheel 创建一个时间轮 func NewTimeWheel(name string, interval int64, scales int, maxCap int) *TimeWheel { + // name:时间轮的名称 + // interval:每个刻度之间的duration时间间隔 + // scales:当前时间轮的轮盘一共多少个刻度(如我们正常的时钟就是12个刻度) + // maxCap: 每个刻度所最大保存的Timer定时器个数 + tw := &TimeWheel{ name: name, interval: interval, @@ -126,7 +128,7 @@ func (tw *TimeWheel) addTimer(tid uint32, t *Timer, forceNext bool) error { return nil } -//添加一个timer到一个时间轮中(非时间轮自转情况) +//AddTimer 添加一个timer到一个时间轮中(非时间轮自转情况) func (tw *TimeWheel) AddTimer(tid uint32, t *Timer) error { tw.Lock() defer tw.Unlock() @@ -134,9 +136,7 @@ func (tw *TimeWheel) AddTimer(tid uint32, t *Timer) error { return tw.addTimer(tid, t, false) } -/* - 删除一个定时器,根据定时器的id -*/ +//RemoveTimer 删除一个定时器,根据定时器的id func (tw *TimeWheel) RemoveTimer(tid uint32) { tw.Lock() defer tw.Unlock() @@ -148,9 +148,7 @@ func (tw *TimeWheel) RemoveTimer(tid uint32) { } } -/* - 给一个时间轮添加下层时间轮 比如给小时时间轮添加分钟时间轮,给分钟时间轮添加秒时间轮 -*/ +//AddTimeWheel 给一个时间轮添加下层时间轮 比如给小时时间轮添加分钟时间轮,给分钟时间轮添加秒时间轮 func (tw *TimeWheel) AddTimeWheel(next *TimeWheel) { tw.nextTimeWheel = next zlog.Info("Add timerWhell[", tw.name, "]'s next [", next.name, "] is succ!") @@ -188,13 +186,13 @@ func (tw *TimeWheel) run() { } } -//非阻塞的方式让时间轮转起来 +//Run 非阻塞的方式让时间轮转起来 func (tw *TimeWheel) Run() { go tw.run() zlog.Info("timerwheel name = ", tw.name, " is running...") } -//获取定时器在一段时间间隔内的Timer +//GetTimerWithIn 获取定时器在一段时间间隔内的Timer func (tw *TimeWheel) GetTimerWithIn(duration time.Duration) map[uint32]*Timer { //最终触发定时器的一定是挂载最底层时间轮上的定时器 //1 找到最底层时间轮 diff --git a/ztimer/timewheel_test.go b/ztimer/timewheel_test.go index 2e73eed..865eb7b 100644 --- a/ztimer/timewheel_test.go +++ b/ztimer/timewheel_test.go @@ -16,17 +16,15 @@ import ( func TestTimerWheel(t *testing.T) { //创建秒级时间轮 - second_tw := NewTimeWheel(SECOND_NAME, SECOND_INTERVAL, SECOND_SCALES, TIMERS_MAX_CAP) - + secondTw := NewTimeWheel(SecondName, SecondInterval, SecondScales, TimersMaxCap) //创建分钟级时间轮 - minute_tw := NewTimeWheel(MINUTE_NAME, MINUTE_INTERVAL, MINUTE_SCALES, TIMERS_MAX_CAP) - + minuteTw := NewTimeWheel(MinuteName, MinuteInterval, MinuteScales, TimersMaxCap) //创建小时级时间轮 - hour_tw := NewTimeWheel(HOUR_NAME, HOUR_INTERVAL, HOUR_SCALES, TIMERS_MAX_CAP) + hourTw := NewTimeWheel(HourName, HourInterval, HourScales, TimersMaxCap) - //将分层时间轮做关联 - hour_tw.AddTimeWheel(minute_tw) - minute_tw.AddTimeWheel(second_tw) + // 将分层时间轮做关联 + hourTw.AddTimeWheel(minuteTw) + minuteTw.AddTimeWheel(secondTw) fmt.Println("init timewheels done!") @@ -34,33 +32,33 @@ func TestTimerWheel(t *testing.T) { //给时间轮添加定时器 timer1 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{1, 10}), 10*time.Second) - _ = hour_tw.AddTimer(1, timer1) + _ = hourTw.AddTimer(1, timer1) fmt.Println("add timer 1 done!") //给时间轮添加定时器 timer2 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{2, 20}), 20*time.Second) - _ = hour_tw.AddTimer(2, timer2) + _ = hourTw.AddTimer(2, timer2) fmt.Println("add timer 2 done!") //给时间轮添加定时器 timer3 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{3, 30}), 30*time.Second) - _ = hour_tw.AddTimer(3, timer3) + _ = hourTw.AddTimer(3, timer3) fmt.Println("add timer 3 done!") //给时间轮添加定时器 timer4 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{4, 40}), 40*time.Second) - _ = hour_tw.AddTimer(4, timer4) + _ = hourTw.AddTimer(4, timer4) fmt.Println("add timer 4 done!") //给时间轮添加定时器 timer5 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{5, 50}), 50*time.Second) - _ = hour_tw.AddTimer(5, timer5) + _ = hourTw.AddTimer(5, timer5) fmt.Println("add timer 5 done!") //时间轮运行 - second_tw.Run() - minute_tw.Run() - hour_tw.Run() + secondTw.Run() + minuteTw.Run() + hourTw.Run() fmt.Println("timewheels are run!") @@ -70,7 +68,7 @@ func TestTimerWheel(t *testing.T) { fmt.Println("tick...", n) //取出近1ms的超时定时器有哪些 - timers := hour_tw.GetTimerWithIn(1000 * time.Millisecond) + timers := hourTw.GetTimerWithIn(1000 * time.Millisecond) for _, timer := range timers { //调用定时器方法 timer.delayFunc.Call()