diff --git a/ztimer/delayfunc.go b/ztimer/delayfunc.go new file mode 100644 index 0000000..ba9c378 --- /dev/null +++ b/ztimer/delayfunc.go @@ -0,0 +1,53 @@ +/** +* @Author: Aceld +* @Date: 2019/4/30 11:57 +* @Mail: danbing.at@gmail.com +*/ +package ztimer + +import ( + "fmt" + "reflect" + "zinx/zlog" +) + +/* + 定义一个延迟调用函数 + 延迟调用函数就是 时间定时器超时的时候,触发的事先注册好的 + 回调函数 +*/ +type DelayFunc struct { + f func(...interface{}) //f : 延迟函数调用原型 + args []interface{} //args: 延迟调用函数传递的形参 +} + +/* + 创建一个延迟调用函数 +*/ +func NewDelayFunc(f func(v ...interface{}), args []interface{}) *DelayFunc { + return &DelayFunc{ + f:f, + args:args, + } +} + +//打印当前延迟函数的信息,用于日志记录 +func (df *DelayFunc) String() string { + return fmt.Sprintf("{DelayFun:%s, args:%v}", reflect.TypeOf(df.f).Name(), df.args) +} + + + +/* + 执行延迟函数---如果执行失败,抛出异常 + */ +func (df *DelayFunc) Call() { + defer func() { + if err := recover(); err != nil { + zlog.Error(df.String(), "Call err: ", err) + } + }() + + //调用定时器超时函数 + df.f(df.args...) +} \ No newline at end of file diff --git a/ztimer/delayfunc_test.go b/ztimer/delayfunc_test.go new file mode 100644 index 0000000..6b147f1 --- /dev/null +++ b/ztimer/delayfunc_test.go @@ -0,0 +1,23 @@ +/** +* @Author: Aceld +* @Date: 2019/4/30 15:17 +* @Mail: danbing.at@gmail.com +* +* 针对 delayFunc.go 做单元测试,主要测试延迟函数结构体是否正常使用 +*/ +package ztimer + +import ( + "fmt" + "testing" +) + +func SayHello(message ...interface{}) { + fmt.Println(message[0].(string), " ",message[1].(string)) +} + +func TestDelayfunc(t *testing.T) { + df := NewDelayFunc(SayHello, []interface{}{"hello", "zinx!"}) + fmt.Println("df.String() = ", df.String()) + df.Call() +} diff --git a/ztimer/timer.go b/ztimer/timer.go new file mode 100644 index 0000000..7479108 --- /dev/null +++ b/ztimer/timer.go @@ -0,0 +1,85 @@ +/** +* @Author: Aceld +* @Date: 2019/4/30 17:42 +* @Mail: danbing.at@gmail.com + */ +package ztimer + +import ( + "time" +) + +const ( + HOUR_NAME = "HOUR" + HOUR_INTERVAL = 60*60*1e3 //ms为精度 + HOUR_SCALES = 12 + + MINUTE_NAME = "MINUTE" + MINUTE_INTERVAL = 60 * 1e3 + MINUTE_SCALES = 60 + + SECOND_NAME = "SECOND" + SECOND_INTERVAL = 1e3 + SECOND_SCALES = 60 + + TIMERS_MAX_CAP = 2048 //每个时间轮刻度挂载定时器的最大个数 +) + +/* + 注意: + 有关时间的几个换算 + time.Second(秒) = time.Millisecond * 1e3 + time.Millisecond(毫秒) = time.Microsecond * 1e3 + time.Microsecond(微秒) = time.Nanosecond * 1e3 + + time.Now().UnixNano() ==> time.Nanosecond (纳秒) + */ + +/* + 定时器实现 +*/ +type Timer struct { + //延迟调用函数 + delayFunc *DelayFunc + //调用时间(unix 时间, 单位ms) + unixts int64 +} + +//返回1970-1-1至今经历的毫秒数 +func UnixMilli() int64 { + return time.Now().UnixNano() / 1e6 +} + +/* + 创建一个定时器,在指定的时间触发 定时器方法 + df: DelayFunc类型的延迟调用函数类型 + unixNano: unix计算机从1970-1-1至今经历的纳秒数 +*/ +func NewTimerAt(df *DelayFunc, unixNano int64) *Timer { + return &Timer{ + delayFunc: df, + unixts: unixNano / 1e6, //将纳秒转换成对应的毫秒 ms ,定时器以ms为最小精度 + } +} + +/* + 创建一个定时器,在当前时间延迟duration之后触发 定时器方法 +*/ +func NewTimerAfter(df *DelayFunc, duration time.Duration) *Timer { + return NewTimerAt(df, time.Now().UnixNano()+int64(duration)) +} + +//启动定时器,用一个go承载 +func (t *Timer) Run() { + go func() { + now := UnixMilli() + //设置的定时器是否在当前时间之后 + if t.unixts > now { + //睡眠,直至时间超时,已微秒为单位进行睡眠 + time.Sleep(time.Duration(t.unixts-now) * time.Millisecond) + } + + //调用事先注册好的超时延迟方法 + t.delayFunc.Call() + }() +} diff --git a/ztimer/timer_test.go b/ztimer/timer_test.go new file mode 100644 index 0000000..89adfc1 --- /dev/null +++ b/ztimer/timer_test.go @@ -0,0 +1,32 @@ +/** +* @Author: Aceld +* @Date: 2019/5/5 10:14 +* @Mail: danbing.at@gmail.com +* +* 针对timer.go做单元测试,主要测试定时器相关接口 依赖模块delayFunc.go +*/ +package ztimer + +import ( + "fmt" + "testing" + "time" +) + +//定义一个超时函数 +func myFunc(v ...interface{}) { + fmt.Printf("No.%d function calld. delay %d second(s)\n", v[0].(int), v[1].(int)) +} + +func TestTimer(t *testing.T) { + + + for i:=0; i < 5;i ++ { + go func(i int) { + NewTimerAfter(NewDelayFunc(myFunc,[]interface{}{i, 2*i}), time.Duration(2*i)*time.Second).Run() + }(i) + } + + //主进程等待其他go,由于Run()方法是用一个新的go承载延迟方法,这里不能用waitGroup + time.Sleep(1*time.Minute) +} \ No newline at end of file diff --git a/ztimer/timerscheduler.go b/ztimer/timerscheduler.go new file mode 100644 index 0000000..1fe45fc --- /dev/null +++ b/ztimer/timerscheduler.go @@ -0,0 +1,135 @@ +/** +* @Author: Aceld +* @Date: 2019/5/8 17:43 +* @Mail: danbing.at@gmail.com +* +* 时间轮调度器 +* 依赖模块,delayfunc.go timer.go timewheel.go +*/ +package ztimer + +import ( + "math" + "sync" + "time" + "zinx/zlog" +) + +const ( + //默认缓冲触发函数队列大小 + MAX_CHAN_BUFF = 2048 + //默认最大误差时间 + MAX_TIME_DELAY = 100 +) + +type TimerScheduler struct { + //当前调度器的最高级时间轮 + tw *TimeWheel + //定时器编号累加器 + idGen uint32 + //已经触发定时器的channel + triggerChan chan *DelayFunc + //互斥锁 + sync.RWMutex +} + +/* + 返回一个定时器调度器 + + 主要创建分层定时器,并做关联,并依次启动 +*/ +func NewTimerScheduler() *TimerScheduler { + + //创建秒级时间轮 + second_tw := NewTimeWheel(SECOND_NAME, SECOND_INTERVAL, SECOND_SCALES, TIMERS_MAX_CAP) + //创建分钟级时间轮 + minute_tw := NewTimeWheel(MINUTE_NAME, MINUTE_INTERVAL, MINUTE_SCALES, TIMERS_MAX_CAP) + //创建小时级时间轮 + hour_tw := NewTimeWheel(HOUR_NAME, HOUR_INTERVAL, HOUR_SCALES, TIMERS_MAX_CAP) + + //将分层时间轮做关联 + hour_tw.AddTimeWheel(minute_tw) + minute_tw.AddTimeWheel(second_tw) + + //时间轮运行 + second_tw.Run() + minute_tw.Run() + hour_tw.Run() + + return &TimerScheduler{ + tw:hour_tw, + triggerChan:make(chan *DelayFunc, MAX_CHAN_BUFF), + } +} + +//创建一个定点Timer 并将Timer添加到分层时间轮中, 返回Timer的tid +func (this *TimerScheduler) CreateTimerAt(df *DelayFunc, unixNano int64)(uint32, error) { + this.Lock() + defer this.Unlock() + + this.idGen++ + return this.idGen, this.tw.AddTimer(this.idGen, NewTimerAt(df, unixNano)) +} + +//创建一个延迟Timer 并将Timer添加到分层时间轮中, 返回Timer的tid +func (this *TimerScheduler) CreateTimerAfter(df *DelayFunc, duration time.Duration)(uint32, error) { + this.Lock() + defer this.Unlock() + + this.idGen++ + return this.idGen, this.tw.AddTimer(this.idGen, NewTimerAfter(df, duration)) +} + +//删除timer +func(this *TimerScheduler) CancelTimer(tid uint32) { + this.Lock() + this.Unlock() + + this.tw.RemoveTimer(tid) +} + +//获取计时结束的延迟执行函数通道 +func (this *TimerScheduler) GetTriggerChan() chan *DelayFunc { + return this.triggerChan +} + +//非阻塞的方式启动timerSchedule +func (this *TimerScheduler) Start() { + go func() { + for { + //当前时间 + now := UnixMilli() + //获取最近MAX_TIME_DELAY 毫秒的超时定时器集合 + timerList := this.tw.GetTimerWithIn(MAX_TIME_DELAY * time.Millisecond) + for _, timer := range timerList { + if math.Abs(float64(now-timer.unixts)) > MAX_TIME_DELAY { + //已经超时的定时器,报警 + zlog.Error("want call at ", timer.unixts, "; real call at", now, "; delay ", now-timer.unixts) + } + //将超时触发函数写入管道 + this.triggerChan <- timer.delayFunc + } + + + time.Sleep(MAX_TIME_DELAY/2 * time.Millisecond) + } + }() +} + +//时间轮定时器 自动调度 +func NewAutoExecTimerScheduler() *TimerScheduler{ + //创建一个调度器 + autoExecScheduler := NewTimerScheduler() + //启动调度器 + autoExecScheduler.Start() + + //永久从调度器中获取超时 触发的函数 并执行 + go func() { + delayFuncChan := autoExecScheduler.GetTriggerChan() + for df := range delayFuncChan { + go df.Call() + } + }() + + return autoExecScheduler +} diff --git a/ztimer/timerscheduler_test.go b/ztimer/timerscheduler_test.go new file mode 100644 index 0000000..30eca34 --- /dev/null +++ b/ztimer/timerscheduler_test.go @@ -0,0 +1,67 @@ +/** +* @Author: Aceld(刘丹冰) +* @Date: 2019/5/9 10:14 +* @Mail: danbing.at@gmail.com +* +* 时间轮定时器调度器单元测试 +*/ +package ztimer + +import ( + "fmt" + "testing" + "time" + "zinx/zlog" +) + +//触发函数 +func foo(args ...interface{}){ + fmt.Printf("I am No. %d function, delay %d ms\n", args[0].(int), args[1].(int)) +} + +//手动创建调度器运转时间轮 +func TestNewTimerScheduler(t *testing.T) { + timerScheduler := NewTimerScheduler() + timerScheduler.Start() + + //在scheduler中添加timer + for i := 1; i < 2000; i ++ { + f := NewDelayFunc(foo, []interface{}{i, i*3}) + tid, err := timerScheduler.CreateTimerAfter(f, time.Duration(3*i) * time.Millisecond) + if err != nil { + zlog.Error("create timer error", tid, err) + break + } + } + + //执行调度器触发函数 + go func() { + delayFuncChan := timerScheduler.GetTriggerChan() + for df := range delayFuncChan { + df.Call() + } + }() + + //阻塞等待 + select{} +} + +//采用自动调度器运转时间轮 +func TestNewAutoExecTimerScheduler(t *testing.T) { + autoTS := NewAutoExecTimerScheduler() + + //给调度器添加Timer + for i := 0; i < 2000; i ++ { + f := NewDelayFunc(foo, []interface{}{i, i*3}) + tid, err := autoTS.CreateTimerAfter(f, time.Duration(3*i) * time.Millisecond) + if err != nil { + zlog.Error("create timer error", tid, err) + break + } + } + + //阻塞等待 + select{} +} + + diff --git a/ztimer/timewheel.go b/ztimer/timewheel.go new file mode 100644 index 0000000..a39b5f9 --- /dev/null +++ b/ztimer/timewheel.go @@ -0,0 +1,224 @@ +/** +* @Author: Aceld +* @Date: 2019/4/30 11:57 +* @Mail: danbing.at@gmail.com + */ +package ztimer + +import ( + "errors" + "fmt" + "sync" + "time" + "zinx/zlog" +) + +/* + tips: + 一个网络服务程序时需要管理大量客户端连接的, + 其中每个客户端连接都需要管理它的 timeout 时间。 + 通常连接的超时管理一般设置为30~60秒不等,并不需要太精确的时间控制。 + 另外由于服务端管理着多达数万到数十万不等的连接数, + 因此我们没法为每个连接使用一个Timer,那样太消耗资源不现实。 + + 用时间轮的方式来管理和维护大量的timer调度,会解决上面的问题。 +*/ + +type TimeWheel struct { + //TimeWheel的名称 + name string + //刻度的时间间隔,单位ms + interval int64 + //每个时间轮上的刻度数 + scales int + //当前时间指针的指向 + curIndex int + //每个刻度所存放的timer定时器的最大容量 + maxCap int + //当前时间轮上的所有timer + timerQueue map[int]map[uint32]*Timer //map[int] VALUE 其中int表示当前时间轮的刻度, + // map[int] map[uint32] *Timer, uint32表示Timer的id号 + //下一层时间轮 + nextTimeWheel *TimeWheel + //互斥锁(继承RWMutex的 RWLock,UnLock 等方法) + sync.RWMutex +} + +/* + 创建一个时间轮 + name:时间轮的名称 + interval:每个刻度之间的duration时间间隔 + scales:当前时间轮的轮盘一共多少个刻度(如我们正常的时钟就是12个刻度) + maxCap: 每个刻度所最大保存的Timer定时器个数 +*/ +func NewTimeWheel(name string, interval int64, scales int, maxCap int) *TimeWheel { + tw := &TimeWheel{ + name: name, + interval: interval, + scales: scales, + maxCap: maxCap, + timerQueue: make(map[int]map[uint32]*Timer, scales), + } + //初始化map + for i := 0; i < scales; i++ { + tw.timerQueue[i] = make(map[uint32]*Timer, maxCap) + } + + zlog.Info("Init timerWhell name = ", tw.name, " is Done!") + return tw +} + +/* + 将一个timer定时器加入到分层时间轮中 + tid: 每个定时器timer的唯一标识 + t: 当前被加入时间轮的定时器 + forceNext: 是否强制的将定时器添加到下一层时间轮 + + 我们采用的算法是: + 如果当前timer的超时时间间隔 大于一个刻度,那么进行hash计算 找到对应的刻度上添加 + 如果当前的timer的超时时间间隔 小于一个刻度 : + 如果没有下一轮时间轮 +*/ +func (tw *TimeWheel) addTimer(tid uint32, t *Timer, forceNext bool) error { + defer func() error { + if err := recover(); err != nil { + errstr := fmt.Sprintf("addTimer function err : %s", err) + zlog.Error(errstr) + return errors.New(errstr) + } + return nil + }() + + //得到当前的超时时间间隔(ms)毫秒为单位 + delayInterval := t.unixts - UnixMilli() + + //如果当前的超时时间 大于一个刻度的时间间隔 + if delayInterval >= tw.interval { + //得到需要跨越几个刻度 + dn := delayInterval / tw.interval + //在对应的刻度上的定时器Timer集合map加入当前定时器(由于是环形,所以要求余) + tw.timerQueue[(tw.curIndex+int(dn))%tw.scales][tid] = t + + return nil + } + + //如果当前的超时时间,小于一个刻度的时间间隔,并且当前时间轮没有下一层,经度最小的时间轮 + if delayInterval < tw.interval && tw.nextTimeWheel == nil { + if forceNext == true { + //如果设置为强制移至下一个刻度,那么将定时器移至下一个刻度 + //这种情况,主要是时间轮自动轮转的情况 + //因为这是底层时间轮,该定时器在转动的时候,如果没有被调度者取走的话,该定时器将不会再被发现 + //因为时间轮刻度已经过去,如果不强制把该定时器Timer移至下时刻,就永远不会被取走并触发调用 + //所以这里强制将timer移至下个刻度的集合中,等待调用者在下次轮转之前取走该定时器 + tw.timerQueue[(tw.curIndex+1) % tw.scales][tid] = t + } else { + //如果手动添加定时器,那么直接将timer添加到对应底层时间轮的当前刻度集合中 + tw.timerQueue[tw.curIndex][tid] = t + } + return nil + } + + //如果当前的超时时间,小于一个刻度的时间间隔,并且有下一层时间轮 + if delayInterval < tw.interval { + return tw.nextTimeWheel.AddTimer(tid, t) + } + + return nil +} + +//添加一个timer到一个时间轮中(非时间轮自转情况) +func (tw *TimeWheel) AddTimer(tid uint32, t *Timer) error { + tw.Lock() + defer tw.Unlock() + + return tw.addTimer(tid, t, false) +} + +/* + 删除一个定时器,根据定时器的id +*/ +func (tw *TimeWheel) RemoveTimer(tid uint32) { + tw.Lock() + defer tw.Unlock() + + for i := 0; i < tw.scales; i++ { + if _, ok := tw.timerQueue[i][tid]; ok { + delete(tw.timerQueue[i], tid) + } + } +} + +/* + 给一个时间轮添加下层时间轮 比如给小时时间轮添加分钟时间轮,给分钟时间轮添加秒时间轮 +*/ +func (tw *TimeWheel) AddTimeWheel(next *TimeWheel) { + tw.nextTimeWheel = next + zlog.Info("Add timerWhell[", tw.name,"]'s next [", next.name,"] is succ!") +} + +/* + 启动时间轮 +*/ +func (tw *TimeWheel) run() { + for { + //时间轮每间隔interval一刻度时间,触发转动一次 + time.Sleep(time.Duration(tw.interval) * time.Millisecond) + + tw.Lock() + //取出挂载在当前刻度的全部定时器 + curTimers := tw.timerQueue[tw.curIndex] + //当前定时器要重新添加 所给当前刻度再重新开辟一个map Timer容器 + tw.timerQueue[tw.curIndex] = make(map[uint32]*Timer, tw.maxCap) + for tid, timer := range curTimers { + //这里属于时间轮自动转动,forceNext设置为true + tw.addTimer(tid, timer, true) + } + + //取出下一个刻度 挂载的全部定时器 进行重新添加 (为了安全起见,待考慮) + nextTimers := tw.timerQueue[(tw.curIndex+1) % tw.scales] + tw.timerQueue[(tw.curIndex+1) % tw.scales] = make(map[uint32]*Timer, tw.maxCap) + for tid, timer := range nextTimers { + tw.addTimer(tid, timer, true) + } + + //当前刻度指针 走一格 + tw.curIndex = (tw.curIndex+1) % tw.scales + + tw.Unlock() + } +} + +//非阻塞的方式让时间轮转起来 +func (tw *TimeWheel) Run() { + go tw.run() + zlog.Info("timerwheel name = ", tw.name, " is running...") +} + +//获取定时器在一段时间间隔内的Timer +func (tw *TimeWheel) GetTimerWithIn(duration time.Duration) map[uint32]*Timer { + //最终触发定时器的一定是挂载最底层时间轮上的定时器 + //1 找到最底层时间轮 + leaftw := tw + for leaftw.nextTimeWheel != nil { + leaftw = leaftw.nextTimeWheel + } + + leaftw.Lock() + defer leaftw.Unlock() + //返回的Timer集合 + timerList := make(map[uint32]*Timer) + + now := UnixMilli() + + //取出当前时间轮刻度内全部Timer + for tid, timer := range leaftw.timerQueue[leaftw.curIndex] { + if timer.unixts-now < int64(duration/1e6) { + //当前定时器已经超时 + timerList[tid] = timer + //定时器已经超时被取走,从当前时间轮上 摘除该定时器 + delete(leaftw.timerQueue[leaftw.curIndex], tid) + } + } + + return timerList +} diff --git a/ztimer/timewheel_test.go b/ztimer/timewheel_test.go new file mode 100644 index 0000000..1a05da3 --- /dev/null +++ b/ztimer/timewheel_test.go @@ -0,0 +1,87 @@ +/** +* @Author: Aceld +* @Date: 2019/5/7 18:00 +* @Mail: danbing.at@gmail.com +* +* 针对 timer_wheel.go 时间轮api 做单元测试, 主要测试时间轮运转功能 +* 依赖模块 delayFunc.go timer.go +*/ +package ztimer + +import ( + "fmt" + "testing" + "time" +) + +func TestTimerWheel(t *testing.T) { + //创建秒级时间轮 + second_tw := NewTimeWheel(SECOND_NAME, SECOND_INTERVAL, SECOND_SCALES, TIMERS_MAX_CAP) + + //创建分钟级时间轮 + minute_tw := NewTimeWheel(MINUTE_NAME, MINUTE_INTERVAL, MINUTE_SCALES, TIMERS_MAX_CAP) + + //创建小时级时间轮 + hour_tw := NewTimeWheel(HOUR_NAME, HOUR_INTERVAL, HOUR_SCALES, TIMERS_MAX_CAP) + + //将分层时间轮做关联 + hour_tw.AddTimeWheel(minute_tw) + minute_tw.AddTimeWheel(second_tw) + + fmt.Println("init timewheels done!") + + + //===== > 以上为初始化分层时间轮 <==== + + //给时间轮添加定时器 + timer1 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{1,10}), 10 * time.Second) + _ = hour_tw.AddTimer(1, timer1) + fmt.Println("add timer 1 done!") + + //给时间轮添加定时器 + timer2 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{2,20}), 20 * time.Second) + _ = hour_tw.AddTimer(2, timer2) + fmt.Println("add timer 2 done!") + + //给时间轮添加定时器 + timer3 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{3,30}), 30 * time.Second) + _ = hour_tw.AddTimer(3, timer3) + fmt.Println("add timer 3 done!") + + //给时间轮添加定时器 + timer4 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{4,40}), 40 * time.Second) + _ = hour_tw.AddTimer(4, timer4) + fmt.Println("add timer 4 done!") + + //给时间轮添加定时器 + timer5 := NewTimerAfter(NewDelayFunc(myFunc, []interface{}{5,50}), 50 * time.Second) + _ = hour_tw.AddTimer(5, timer5) + fmt.Println("add timer 5 done!") + + //时间轮运行 + second_tw.Run() + minute_tw.Run() + hour_tw.Run() + + fmt.Println("timewheels are run!") + + go func() { + n := 0.0 + for { + fmt.Println("tick...", n) + + //取出近1ms的超时定时器有哪些 + timers := hour_tw.GetTimerWithIn(1000 *time.Millisecond) + for _, timer := range timers { + //调用定时器方法 + timer.delayFunc.Call() + } + + time.Sleep(500 * time.Millisecond) + n += 0.5 + } + }() + + //主进程等待其他go,由于Run()方法是用一个新的go承载延迟方法,这里不能用waitGroup + time.Sleep(10*time.Minute) +}