You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
3.9 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package ztimer
  2. /**
  3. * @Author: Aceld
  4. * @Date: 2019/5/8 17:43
  5. * @Mail: danbing.at@gmail.com
  6. *
  7. * 时间轮调度器
  8. * 依赖模块delayfunc.go timer.go timewheel.go
  9. */
  10. import (
  11. "math"
  12. "sync"
  13. "time"
  14. "github.com/aceld/zinx/zlog"
  15. )
  16. const (
  17. //MaxChanBuff 默认缓冲触发函数队列大小
  18. MaxChanBuff = 2048
  19. //MaxTimeDelay 默认最大误差时间
  20. MaxTimeDelay = 100
  21. )
  22. //TimerScheduler 计时器调度器
  23. type TimerScheduler struct {
  24. //当前调度器的最高级时间轮
  25. tw *TimeWheel
  26. //定时器编号累加器
  27. IDGen uint32
  28. //已经触发定时器的channel
  29. triggerChan chan *DelayFunc
  30. //互斥锁
  31. sync.RWMutex
  32. //所有注册的timerID集合
  33. IDs []uint32
  34. }
  35. // NewTimerScheduler 返回一个定时器调度器 ,主要创建分层定时器,并做关联,并依次启动
  36. func NewTimerScheduler() *TimerScheduler {
  37. //创建秒级时间轮
  38. secondTw := NewTimeWheel(SecondName, SecondInterval, SecondScales, TimersMaxCap)
  39. //创建分钟级时间轮
  40. minuteTw := NewTimeWheel(MinuteName, MinuteInterval, MinuteScales, TimersMaxCap)
  41. //创建小时级时间轮
  42. hourTw := NewTimeWheel(HourName, HourInterval, HourScales, TimersMaxCap)
  43. //将分层时间轮做关联
  44. hourTw.AddTimeWheel(minuteTw)
  45. minuteTw.AddTimeWheel(secondTw)
  46. //时间轮运行
  47. secondTw.Run()
  48. minuteTw.Run()
  49. hourTw.Run()
  50. return &TimerScheduler{
  51. tw: hourTw,
  52. triggerChan: make(chan *DelayFunc, MaxChanBuff),
  53. IDs: make([]uint32, 0),
  54. }
  55. }
  56. //CreateTimerAt 创建一个定点Timer 并将Timer添加到分层时间轮中, 返回Timer的tID
  57. func (ts *TimerScheduler) CreateTimerAt(df *DelayFunc, unixNano int64) (uint32, error) {
  58. ts.Lock()
  59. defer ts.Unlock()
  60. ts.IDGen++
  61. ts.IDs = append(ts.IDs, ts.IDGen)
  62. return ts.IDGen, ts.tw.AddTimer(ts.IDGen, NewTimerAt(df, unixNano))
  63. }
  64. //CreateTimerAfter 创建一个延迟Timer 并将Timer添加到分层时间轮中, 返回Timer的tID
  65. func (ts *TimerScheduler) CreateTimerAfter(df *DelayFunc, duration time.Duration) (uint32, error) {
  66. ts.Lock()
  67. defer ts.Unlock()
  68. ts.IDGen++
  69. ts.IDs = append(ts.IDs, ts.IDGen)
  70. return ts.IDGen, ts.tw.AddTimer(ts.IDGen, NewTimerAfter(df, duration))
  71. }
  72. //CancelTimer 删除timer
  73. func (ts *TimerScheduler) CancelTimer(tID uint32) {
  74. ts.Lock()
  75. ts.Unlock()
  76. //ts.tw.RemoveTimer(tID) 这个方法无效
  77. //删除timerID
  78. var index = -1
  79. for i := 0; i < len(ts.IDs); i++ {
  80. if ts.IDs[i] == tID {
  81. index = i
  82. }
  83. }
  84. if index > -1 {
  85. ts.IDs = append(ts.IDs[:index], ts.IDs[index+1:]...)
  86. }
  87. }
  88. //GetTriggerChan 获取计时结束的延迟执行函数通道
  89. func (ts *TimerScheduler) GetTriggerChan() chan *DelayFunc {
  90. return ts.triggerChan
  91. }
  92. // HasTimer 是否有时间轮
  93. func (ts *TimerScheduler) HasTimer(tID uint32) bool {
  94. for i := 0; i < len(ts.IDs); i++ {
  95. if ts.IDs[i] == tID {
  96. return true
  97. }
  98. }
  99. return false
  100. }
  101. //Start 非阻塞的方式启动timerSchedule
  102. func (ts *TimerScheduler) Start() {
  103. go func() {
  104. for {
  105. //当前时间
  106. now := UnixMilli()
  107. //获取最近MaxTimeDelay 毫秒的超时定时器集合
  108. timerList := ts.tw.GetTimerWithIn(MaxTimeDelay * time.Millisecond)
  109. for tID, timer := range timerList {
  110. if math.Abs(float64(now-timer.unixts)) > MaxTimeDelay {
  111. //已经超时的定时器,报警
  112. zlog.Error("want call at ", timer.unixts, "; real call at", now, "; delay ", now-timer.unixts)
  113. }
  114. if ts.HasTimer(tID) {
  115. //将超时触发函数写入管道
  116. ts.triggerChan <- timer.delayFunc
  117. }
  118. }
  119. time.Sleep(MaxTimeDelay / 2 * time.Millisecond)
  120. }
  121. }()
  122. }
  123. //NewAutoExecTimerScheduler 时间轮定时器 自动调度
  124. func NewAutoExecTimerScheduler() *TimerScheduler {
  125. //创建一个调度器
  126. autoExecScheduler := NewTimerScheduler()
  127. //启动调度器
  128. autoExecScheduler.Start()
  129. //永久从调度器中获取超时 触发的函数 并执行
  130. go func() {
  131. delayFuncChan := autoExecScheduler.GetTriggerChan()
  132. for df := range delayFuncChan {
  133. go df.Call()
  134. }
  135. }()
  136. return autoExecScheduler
  137. }