You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

134 lines
3.2 KiB

  1. /**
  2. * @Author: Aceld
  3. * @Date: 2019/5/8 17:43
  4. * @Mail: danbing.at@gmail.com
  5. *
  6. * 时间轮调度器
  7. * 依赖模块delayfunc.go timer.go timewheel.go
  8. */
  9. package ztimer
  10. import (
  11. "github.com/aceld/zinx/zlog"
  12. "math"
  13. "sync"
  14. "time"
  15. )
  16. const (
  17. //默认缓冲触发函数队列大小
  18. MAX_CHAN_BUFF = 2048
  19. //默认最大误差时间
  20. MAX_TIME_DELAY = 100
  21. )
  22. type TimerScheduler struct {
  23. //当前调度器的最高级时间轮
  24. tw *TimeWheel
  25. //定时器编号累加器
  26. idGen uint32
  27. //已经触发定时器的channel
  28. triggerChan chan *DelayFunc
  29. //互斥锁
  30. sync.RWMutex
  31. }
  32. /*
  33. 返回一个定时器调度器
  34. 主要创建分层定时器并做关联并依次启动
  35. */
  36. func NewTimerScheduler() *TimerScheduler {
  37. //创建秒级时间轮
  38. second_tw := NewTimeWheel(SECOND_NAME, SECOND_INTERVAL, SECOND_SCALES, TIMERS_MAX_CAP)
  39. //创建分钟级时间轮
  40. minute_tw := NewTimeWheel(MINUTE_NAME, MINUTE_INTERVAL, MINUTE_SCALES, TIMERS_MAX_CAP)
  41. //创建小时级时间轮
  42. hour_tw := NewTimeWheel(HOUR_NAME, HOUR_INTERVAL, HOUR_SCALES, TIMERS_MAX_CAP)
  43. //将分层时间轮做关联
  44. hour_tw.AddTimeWheel(minute_tw)
  45. minute_tw.AddTimeWheel(second_tw)
  46. //时间轮运行
  47. second_tw.Run()
  48. minute_tw.Run()
  49. hour_tw.Run()
  50. return &TimerScheduler{
  51. tw: hour_tw,
  52. triggerChan: make(chan *DelayFunc, MAX_CHAN_BUFF),
  53. }
  54. }
  55. //创建一个定点Timer 并将Timer添加到分层时间轮中, 返回Timer的tid
  56. func (this *TimerScheduler) CreateTimerAt(df *DelayFunc, unixNano int64) (uint32, error) {
  57. this.Lock()
  58. defer this.Unlock()
  59. this.idGen++
  60. return this.idGen, this.tw.AddTimer(this.idGen, NewTimerAt(df, unixNano))
  61. }
  62. //创建一个延迟Timer 并将Timer添加到分层时间轮中, 返回Timer的tid
  63. func (this *TimerScheduler) CreateTimerAfter(df *DelayFunc, duration time.Duration) (uint32, error) {
  64. this.Lock()
  65. defer this.Unlock()
  66. this.idGen++
  67. return this.idGen, this.tw.AddTimer(this.idGen, NewTimerAfter(df, duration))
  68. }
  69. //删除timer
  70. func (this *TimerScheduler) CancelTimer(tid uint32) {
  71. this.Lock()
  72. this.Unlock()
  73. this.tw.RemoveTimer(tid)
  74. }
  75. //获取计时结束的延迟执行函数通道
  76. func (this *TimerScheduler) GetTriggerChan() chan *DelayFunc {
  77. return this.triggerChan
  78. }
  79. //非阻塞的方式启动timerSchedule
  80. func (this *TimerScheduler) Start() {
  81. go func() {
  82. for {
  83. //当前时间
  84. now := UnixMilli()
  85. //获取最近MAX_TIME_DELAY 毫秒的超时定时器集合
  86. timerList := this.tw.GetTimerWithIn(MAX_TIME_DELAY * time.Millisecond)
  87. for _, timer := range timerList {
  88. if math.Abs(float64(now-timer.unixts)) > MAX_TIME_DELAY {
  89. //已经超时的定时器,报警
  90. zlog.Error("want call at ", timer.unixts, "; real call at", now, "; delay ", now-timer.unixts)
  91. }
  92. //将超时触发函数写入管道
  93. this.triggerChan <- timer.delayFunc
  94. }
  95. time.Sleep(MAX_TIME_DELAY / 2 * time.Millisecond)
  96. }
  97. }()
  98. }
  99. //时间轮定时器 自动调度
  100. func NewAutoExecTimerScheduler() *TimerScheduler {
  101. //创建一个调度器
  102. autoExecScheduler := NewTimerScheduler()
  103. //启动调度器
  104. autoExecScheduler.Start()
  105. //永久从调度器中获取超时 触发的函数 并执行
  106. go func() {
  107. delayFuncChan := autoExecScheduler.GetTriggerChan()
  108. for df := range delayFuncChan {
  109. go df.Call()
  110. }
  111. }()
  112. return autoExecScheduler
  113. }