Browse Source

添加zinx时间定时器、分层时间论、时间轮调度器相关模块及单元测试用例

master
aceld 5 years ago
parent
commit
f1ed0536aa
  1. 53
      ztimer/delayfunc.go
  2. 23
      ztimer/delayfunc_test.go
  3. 85
      ztimer/timer.go
  4. 32
      ztimer/timer_test.go
  5. 135
      ztimer/timerscheduler.go
  6. 67
      ztimer/timerscheduler_test.go
  7. 224
      ztimer/timewheel.go
  8. 87
      ztimer/timewheel_test.go

53
ztimer/delayfunc.go

@ -0,0 +1,53 @@
/**
* @Author: Aceld
* @Date: 2019/4/30 11:57
* @Mail: danbing.at@gmail.com
*/
package ztimer
import (
"fmt"
"reflect"
"zinx/zlog"
)
/*
定义一个延迟调用函数
延迟调用函数就是 时间定时器超时的时候触发的事先注册好的
回调函数
*/
type DelayFunc struct {
f func(...interface{}) //f : 延迟函数调用原型
args []interface{} //args: 延迟调用函数传递的形参
}
/*
创建一个延迟调用函数
*/
func NewDelayFunc(f func(v ...interface{}), args []interface{}) *DelayFunc {
return &DelayFunc{
f:f,
args:args,
}
}
//打印当前延迟函数的信息,用于日志记录
func (df *DelayFunc) String() string {
return fmt.Sprintf("{DelayFun:%s, args:%v}", reflect.TypeOf(df.f).Name(), df.args)
}
/*
执行延迟函数---如果执行失败抛出异常
*/
func (df *DelayFunc) Call() {
defer func() {
if err := recover(); err != nil {
zlog.Error(df.String(), "Call err: ", err)
}
}()
//调用定时器超时函数
df.f(df.args...)
}

23
ztimer/delayfunc_test.go

@ -0,0 +1,23 @@
/**
* @Author: Aceld
* @Date: 2019/4/30 15:17
* @Mail: danbing.at@gmail.com
*
* 针对 delayFunc.go 做单元测试主要测试延迟函数结构体是否正常使用
*/
package ztimer
import (
"fmt"
"testing"
)
func SayHello(message ...interface{}) {
fmt.Println(message[0].(string), " ",message[1].(string))
}
func TestDelayfunc(t *testing.T) {
df := NewDelayFunc(SayHello, []interface{}{"hello", "zinx!"})
fmt.Println("df.String() = ", df.String())
df.Call()
}

85
ztimer/timer.go

@ -0,0 +1,85 @@
/**
* @Author: Aceld
* @Date: 2019/4/30 17:42
* @Mail: danbing.at@gmail.com
*/
package ztimer
import (
"time"
)
const (
HOUR_NAME = "HOUR"
HOUR_INTERVAL = 60*60*1e3 //ms为精度
HOUR_SCALES = 12
MINUTE_NAME = "MINUTE"
MINUTE_INTERVAL = 60 * 1e3
MINUTE_SCALES = 60
SECOND_NAME = "SECOND"
SECOND_INTERVAL = 1e3
SECOND_SCALES = 60
TIMERS_MAX_CAP = 2048 //每个时间轮刻度挂载定时器的最大个数
)
/*
注意
有关时间的几个换算
time.Second() = time.Millisecond * 1e3
time.Millisecond(毫秒) = time.Microsecond * 1e3
time.Microsecond(微秒) = time.Nanosecond * 1e3
time.Now().UnixNano() ==> time.Nanosecond (纳秒)
*/
/*
定时器实现
*/
type Timer struct {
//延迟调用函数
delayFunc *DelayFunc
//调用时间(unix 时间, 单位ms)
unixts int64
}
//返回1970-1-1至今经历的毫秒数
func UnixMilli() int64 {
return time.Now().UnixNano() / 1e6
}
/*
创建一个定时器,在指定的时间触发 定时器方法
df: DelayFunc类型的延迟调用函数类型
unixNano: unix计算机从1970-1-1至今经历的纳秒数
*/
func NewTimerAt(df *DelayFunc, unixNano int64) *Timer {
return &Timer{
delayFunc: df,
unixts: unixNano / 1e6, //将纳秒转换成对应的毫秒 ms ,定时器以ms为最小精度
}
}
/*
创建一个定时器在当前时间延迟duration之后触发 定时器方法
*/
func NewTimerAfter(df *DelayFunc, duration time.Duration) *Timer {
return NewTimerAt(df, time.Now().UnixNano()+int64(duration))
}
//启动定时器,用一个go承载
func (t *Timer) Run() {
go func() {
now := UnixMilli()
//设置的定时器是否在当前时间之后
if t.unixts > now {
//睡眠,直至时间超时,已微秒为单位进行睡眠
time.Sleep(time.Duration(t.unixts-now) * time.Millisecond)
}
//调用事先注册好的超时延迟方法
t.delayFunc.Call()
}()
}

32
ztimer/timer_test.go

@ -0,0 +1,32 @@
/**
* @Author: Aceld
* @Date: 2019/5/5 10:14
* @Mail: danbing.at@gmail.com
*
* 针对timer.go做单元测试主要测试定时器相关接口 依赖模块delayFunc.go
*/
package ztimer
import (
"fmt"
"testing"
"time"
)
//定义一个超时函数
func myFunc(v ...interface{}) {
fmt.Printf("No.%d function calld. delay %d second(s)\n", v[0].(int), v[1].(int))
}
func TestTimer(t *testing.T) {
for i:=0; i < 5;i ++ {
go func(i int) {
NewTimerAfter(NewDelayFunc(myFunc,[]interface{}{i, 2*i}), time.Duration(2*i)*time.Second).Run()
}(i)
}
//主进程等待其他go,由于Run()方法是用一个新的go承载延迟方法,这里不能用waitGroup
time.Sleep(1*time.Minute)
}

135
ztimer/timerscheduler.go

@ -0,0 +1,135 @@
/**
* @Author: Aceld
* @Date: 2019/5/8 17:43
* @Mail: danbing.at@gmail.com
*
* 时间轮调度器
* 依赖模块delayfunc.go timer.go timewheel.go
*/
package ztimer
import (
"math"
"sync"
"time"
"zinx/zlog"
)
const (
//默认缓冲触发函数队列大小
MAX_CHAN_BUFF = 2048
//默认最大误差时间
MAX_TIME_DELAY = 100
)
type TimerScheduler struct {
//当前调度器的最高级时间轮
tw *TimeWheel
//定时器编号累加器
idGen uint32
//已经触发定时器的channel
triggerChan chan *DelayFunc
//互斥锁
sync.RWMutex
}
/*
返回一个定时器调度器
主要创建分层定时器并做关联并依次启动
*/
func NewTimerScheduler() *TimerScheduler {
//创建秒级时间轮
second_tw := NewTimeWheel(SECOND_NAME, SECOND_INTERVAL, SECOND_SCALES, TIMERS_MAX_CAP)
//创建分钟级时间轮
minute_tw := NewTimeWheel(MINUTE_NAME, MINUTE_INTERVAL, MINUTE_SCALES, TIMERS_MAX_CAP)
//创建小时级时间轮
hour_tw := NewTimeWheel(HOUR_NAME, HOUR_INTERVAL, HOUR_SCALES, TIMERS_MAX_CAP)
//将分层时间轮做关联
hour_tw.AddTimeWheel(minute_tw)
minute_tw.AddTimeWheel(second_tw)
//时间轮运行
second_tw.Run()
minute_tw.Run()
hour_tw.Run()
return &TimerScheduler{
tw:hour_tw,
triggerChan:make(chan *DelayFunc, MAX_CHAN_BUFF),
}
}
//创建一个定点Timer 并将Timer添加到分层时间轮中, 返回Timer的tid
func (this *TimerScheduler) CreateTimerAt(df *DelayFunc, unixNano int64)(uint32, error) {
this.Lock()
defer this.Unlock()
this.idGen++
return this.idGen, this.tw.AddTimer(this.idGen, NewTimerAt(df, unixNano))
}
//创建一个延迟Timer 并将Timer添加到分层时间轮中, 返回Timer的tid
func (this *TimerScheduler) CreateTimerAfter(df *DelayFunc, duration time.Duration)(uint32, error) {
this.Lock()
defer this.Unlock()
this.idGen++
return this.idGen, this.tw.AddTimer(this.idGen, NewTimerAfter(df, duration))
}
//删除timer
func(this *TimerScheduler) CancelTimer(tid uint32) {
this.Lock()
this.Unlock()
this.tw.RemoveTimer(tid)
}
//获取计时结束的延迟执行函数通道
func (this *TimerScheduler) GetTriggerChan() chan *DelayFunc {
return this.triggerChan
}
//非阻塞的方式启动timerSchedule
func (this *TimerScheduler) Start() {
go func() {
for {
//当前时间
now := UnixMilli()
//获取最近MAX_TIME_DELAY 毫秒的超时定时器集合
timerList := this.tw.GetTimerWithIn(MAX_TIME_DELAY * time.Millisecond)
for _, timer := range timerList {
if math.Abs(float64(now-timer.unixts)) > MAX_TIME_DELAY {
//已经超时的定时器,报警
zlog.Error("want call at ", timer.unixts, "; real call at", now, "; delay ", now-timer.unixts)
}
//将超时触发函数写入管道
this.triggerChan <- timer.delayFunc
}
time.Sleep(MAX_TIME_DELAY/2 * time.Millisecond)
}
}()
}
//时间轮定时器 自动调度
func NewAutoExecTimerScheduler() *TimerScheduler{
//创建一个调度器
autoExecScheduler := NewTimerScheduler()
//启动调度器
autoExecScheduler.Start()
//永久从调度器中获取超时 触发的函数 并执行
go func() {
delayFuncChan := autoExecScheduler.GetTriggerChan()
for df := range delayFuncChan {
go df.Call()
}
}()
return autoExecScheduler
}

67
ztimer/timerscheduler_test.go

@ -0,0 +1,67 @@
/**
* @Author: Aceld(刘丹冰)
* @Date: 2019/5/9 10:14
* @Mail: danbing.at@gmail.com
*
* 时间轮定时器调度器单元测试
*/
package ztimer
import (
"fmt"
"testing"
"time"
"zinx/zlog"
)
//触发函数
func foo(args ...interface{}){
fmt.Printf("I am No. %d function, delay %d ms\n", args[0].(int), args[1].(int))
}
//手动创建调度器运转时间轮
func TestNewTimerScheduler(t *testing.T) {
timerScheduler := NewTimerScheduler()
timerScheduler.Start()
//在scheduler中添加timer
for i := 1; i < 2000; i ++ {
f := NewDelayFunc(foo, []interface{}{i, i*3})
tid, err := timerScheduler.CreateTimerAfter(f, time.Duration(3*i) * time.Millisecond)
if err != nil {
zlog.Error("create timer error", tid, err)
break
}
}
//执行调度器触发函数
go func() {
delayFuncChan := timerScheduler.GetTriggerChan()
for df := range delayFuncChan {
df.Call()
}
}()
//阻塞等待
select{}
}
//采用自动调度器运转时间轮
func TestNewAutoExecTimerScheduler(t *testing.T) {
autoTS := NewAutoExecTimerScheduler()
//给调度器添加Timer
for i := 0; i < 2000; i ++ {
f := NewDelayFunc(foo, []interface{}{i, i*3})
tid, err := autoTS.CreateTimerAfter(f, time.Duration(3*i) * time.Millisecond)
if err != nil {
zlog.Error("create timer error", tid, err)
break
}
}
//阻塞等待
select{}
}

224
ztimer/timewheel.go

@ -0,0 +1,224 @@
/**
* @Author: Aceld
* @Date: 2019/4/30 11:57
* @Mail: danbing.at@gmail.com
*/
package ztimer
import (
"errors"
"fmt"
"sync"
"time"
"zinx/zlog"
)
/*
tips:
一个网络服务程序时需要管理大量客户端连接的
其中每个客户端连接都需要管理它的 timeout 时间
通常连接的超时管理一般设置为30~60秒不等并不需要太精确的时间控制
另外由于服务端管理着多达数万到数十万不等的连接数
因此我们没法为每个连接使用一个Timer那样太消耗资源不现实
用时间轮的方式来管理和维护大量的timer调度会解决上面的问题
*/
type TimeWheel struct {
//TimeWheel的名称
name string
//刻度的时间间隔,单位ms
interval int64
//每个时间轮上的刻度数
scales int
//当前时间指针的指向
curIndex int
//每个刻度所存放的timer定时器的最大容量
maxCap int
//当前时间轮上的所有timer
timerQueue map[int]map[uint32]*Timer //map[int] VALUE 其中int表示当前时间轮的刻度,
// map[int] map[uint32] *Timer, uint32表示Timer的id号
//下一层时间轮
nextTimeWheel *TimeWheel
//互斥锁(继承RWMutex的 RWLock,UnLock 等方法)
sync.RWMutex
}
/*
创建一个时间轮
name时间轮的名称
interval每个刻度之间的duration时间间隔
scales:当前时间轮的轮盘一共多少个刻度(如我们正常的时钟就是12个刻度)
maxCap: 每个刻度所最大保存的Timer定时器个数
*/
func NewTimeWheel(name string, interval int64, scales int, maxCap int) *TimeWheel {
tw := &TimeWheel{
name: name,
interval: interval,
scales: scales,
maxCap: maxCap,
timerQueue: make(map[int]map[uint32]*Timer, scales),
}
//初始化map
for i := 0; i < scales; i++ {
tw.timerQueue[i] = make(map[uint32]*Timer, maxCap)
}
zlog.Info("Init timerWhell name = ", tw.name, " is Done!")
return tw
}
/*
将一个timer定时器加入到分层时间轮中
tid: 每个定时器timer的唯一标识
t: 当前被加入时间轮的定时器
forceNext: 是否强制的将定时器添加到下一层时间轮
我们采用的算法是
如果当前timer的超时时间间隔 大于一个刻度那么进行hash计算 找到对应的刻度上添加
如果当前的timer的超时时间间隔 小于一个刻度 :
如果没有下一轮时间轮
*/
func (tw *TimeWheel) addTimer(tid uint32, t *Timer, forceNext bool) error {
defer func() error {
if err := recover(); err != nil {
errstr := fmt.Sprintf("addTimer function err : %s", err)
zlog.Error(errstr)
return errors.New(errstr)
}
return nil
}()
//得到当前的超时时间间隔(ms)毫秒为单位
delayInterval := t.unixts - UnixMilli()
//如果当前的超时时间 大于一个刻度的时间间隔
if delayInterval >= tw.interval {
//得到需要跨越几个刻度
dn := delayInterval / tw.interval
//在对应的刻度上的定时器Timer集合map加入当前定时器(由于是环形,所以要求余)
tw.timerQueue[(tw.curIndex+int(dn))%tw.scales][tid] = t
return nil
}
//如果当前的超时时间,小于一个刻度的时间间隔,并且当前时间轮没有下一层,经度最小的时间轮
if delayInterval < tw.interval && tw.nextTimeWheel == nil {
if forceNext == true {
//如果设置为强制移至下一个刻度,那么将定时器移至下一个刻度
//这种情况,主要是时间轮自动轮转的情况
//因为这是底层时间轮,该定时器在转动的时候,如果没有被调度者取走的话,该定时器将不会再被发现
//因为时间轮刻度已经过去,如果不强制把该定时器Timer移至下时刻,就永远不会被取走并触发调用
//所以这里强制将timer移至下个刻度的集合中,等待调用者在下次轮转之前取走该定时器
tw.timerQueue[(tw.curIndex+1) % tw.scales][tid] = t
} else {
//如果手动添加定时器,那么直接将timer添加到对应底层时间轮的当前刻度集合中
tw.timerQueue[tw.curIndex][tid] = t
}
return nil
}
//如果当前的超时时间,小于一个刻度的时间间隔,并且有下一层时间轮
if delayInterval < tw.interval {
return tw.nextTimeWheel.AddTimer(tid, t)
}
return nil
}
//添加一个timer到一个时间轮中(非时间轮自转情况)
func (tw *TimeWheel) AddTimer(tid uint32, t *Timer) error {
tw.Lock()
defer tw.Unlock()
return tw.addTimer(tid, t, false)
}
/*
删除一个定时器根据定时器的id
*/
func (tw *TimeWheel) RemoveTimer(tid uint32) {
tw.Lock()
defer tw.Unlock()
for i := 0; i < tw.scales; i++ {
if _, ok := tw.timerQueue[i][tid]; ok {
delete(tw.timerQueue[i], tid)
}
}
}
/*
给一个时间轮添加下层时间轮 比如给小时时间轮添加分钟时间轮给分钟时间轮添加秒时间轮
*/
func (tw *TimeWheel) AddTimeWheel(next *TimeWheel) {
tw.nextTimeWheel = next
zlog.Info("Add timerWhell[", tw.name,"]'s next [", next.name,"] is succ!")
}
/*
启动时间轮
*/
func (tw *TimeWheel) run() {
for {
//时间轮每间隔interval一刻度时间,触发转动一次
time.Sleep(time.Duration(tw.interval) * time.Millisecond)
tw.Lock()
//取出挂载在当前刻度的全部定时器
curTimers := tw.timerQueue[tw.curIndex]
//当前定时器要重新添加 所给当前刻度再重新开辟一个map Timer容器
tw.timerQueue[tw.curIndex] = make(map[uint32]*Timer, tw.maxCap)
for tid, timer := range curTimers {
//这里属于时间轮自动转动,forceNext设置为true
tw.addTimer(tid, timer, true)
}
//取出下一个刻度 挂载的全部定时器 进行重新添加 (为了安全起见,待考慮)
nextTimers := tw.timerQueue[(tw.curIndex+1) % tw.scales]
tw.timerQueue[(tw.curIndex+1) % tw.scales] = make(map[uint32]*Timer, tw.maxCap)
for tid, timer := range nextTimers {
tw.addTimer(tid, timer, true)
}
//当前刻度指针 走一格
tw.curIndex = (tw.curIndex+1) % tw.scales
tw.Unlock()
}
}
//非阻塞的方式让时间轮转起来
func (tw *TimeWheel) Run() {
go tw.run()
zlog.Info("timerwheel name = ", tw.name, " is running...")
}
//获取定时器在一段时间间隔内的Timer
func (tw *TimeWheel) GetTimerWithIn(duration time.Duration) map[uint32]*Timer {
//最终触发定时器的一定是挂载最底层时间轮上的定时器
//1 找到最底层时间轮
leaftw := tw
for leaftw.nextTimeWheel != nil {
leaftw = leaftw.nextTimeWheel
}
leaftw.Lock()
defer leaftw.Unlock()
//返回的Timer集合
timerList := make(map[uint32]*Timer)
now := UnixMilli()
//取出当前时间轮刻度内全部Timer
for tid, timer := range leaftw.timerQueue[leaftw.curIndex] {
if timer.unixts-now < int64(duration/1e6) {
//当前定时器已经超时
timerList[tid] = timer
//定时器已经超时被取走,从当前时间轮上 摘除该定时器
delete(leaftw.timerQueue[leaftw.curIndex], tid)
}
}
return timerList
}

87
ztimer/timewheel_test.go

@ -0,0 +1,87 @@
/**
* @Author: Aceld
* @Date: 2019/5/7 18:00
* @Mail: danbing.at@gmail.com
*
* 针对 timer_wheel.go 时间轮api 做单元测试, 主要测试时间轮运转功能
* 依赖模块 delayFunc.go timer.go
*/
package ztimer
import (
"fmt"
"testing"
"time"
)
func TestTimerWheel(t *testing.T) {
//创建秒级时间轮
second_tw := NewTimeWheel(SECOND_NAME, SECOND_INTERVAL, SECOND_SCALES, TIMERS_MAX_CAP)
//创建分钟级时间轮
minute_tw := NewTimeWheel(MINUTE_NAME, MINUTE_INTERVAL, MINUTE_SCALES, TIMERS_MAX_CAP)
//创建小时级时间轮
hour_tw := NewTimeWheel(HOUR_NAME, HOUR_INTERVAL, HOUR_SCALES, TIMERS_MAX_CAP)
//将分层时间轮做关联
hour_tw.AddTimeWheel(minute_tw)
minute_tw.AddTimeWheel(second_tw)
fmt.Println("init timewheels done!")
//===== > 以上为初始化分层时间轮 <====
//给时间轮添加定时器
timer1 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{1,10}), 10 * time.Second)
_ = hour_tw.AddTimer(1, timer1)
fmt.Println("add timer 1 done!")
//给时间轮添加定时器
timer2 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{2,20}), 20 * time.Second)
_ = hour_tw.AddTimer(2, timer2)
fmt.Println("add timer 2 done!")
//给时间轮添加定时器
timer3 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{3,30}), 30 * time.Second)
_ = hour_tw.AddTimer(3, timer3)
fmt.Println("add timer 3 done!")
//给时间轮添加定时器
timer4 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{4,40}), 40 * time.Second)
_ = hour_tw.AddTimer(4, timer4)
fmt.Println("add timer 4 done!")
//给时间轮添加定时器
timer5 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{5,50}), 50 * time.Second)
_ = hour_tw.AddTimer(5, timer5)
fmt.Println("add timer 5 done!")
//时间轮运行
second_tw.Run()
minute_tw.Run()
hour_tw.Run()
fmt.Println("timewheels are run!")
go func() {
n := 0.0
for {
fmt.Println("tick...", n)
//取出近1ms的超时定时器有哪些
timers := hour_tw.GetTimerWithIn(1000 *time.Millisecond)
for _, timer := range timers {
//调用定时器方法
timer.delayFunc.Call()
}
time.Sleep(500 * time.Millisecond)
n += 0.5
}
}()
//主进程等待其他go,由于Run()方法是用一个新的go承载延迟方法,这里不能用waitGroup
time.Sleep(10*time.Minute)
}
Loading…
Cancel
Save