You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

156 lines
3.8 KiB

  1. /**
  2. * @Author: Aceld
  3. * @Date: 2019/5/8 17:43
  4. * @Mail: danbing.at@gmail.com
  5. *
  6. * 时间轮调度器
  7. * 依赖模块delayfunc.go timer.go timewheel.go
  8. */
  9. package ztimer
  10. import (
  11. "github.com/aceld/zinx/zlog"
  12. "math"
  13. "sync"
  14. "time"
  15. )
  16. const (
  17. //默认缓冲触发函数队列大小
  18. MAX_CHAN_BUFF = 2048
  19. //默认最大误差时间
  20. MAX_TIME_DELAY = 100
  21. )
  22. type TimerScheduler struct {
  23. //当前调度器的最高级时间轮
  24. tw *TimeWheel
  25. //定时器编号累加器
  26. idGen uint32
  27. //已经触发定时器的channel
  28. triggerChan chan *DelayFunc
  29. //互斥锁
  30. sync.RWMutex
  31. //所有注册的timerId集合
  32. ids []uint32
  33. }
  34. /*
  35. 返回一个定时器调度器
  36. 主要创建分层定时器并做关联并依次启动
  37. */
  38. func NewTimerScheduler() *TimerScheduler {
  39. //创建秒级时间轮
  40. second_tw := NewTimeWheel(SECOND_NAME, SECOND_INTERVAL, SECOND_SCALES, TIMERS_MAX_CAP)
  41. //创建分钟级时间轮
  42. minute_tw := NewTimeWheel(MINUTE_NAME, MINUTE_INTERVAL, MINUTE_SCALES, TIMERS_MAX_CAP)
  43. //创建小时级时间轮
  44. hour_tw := NewTimeWheel(HOUR_NAME, HOUR_INTERVAL, HOUR_SCALES, TIMERS_MAX_CAP)
  45. //将分层时间轮做关联
  46. hour_tw.AddTimeWheel(minute_tw)
  47. minute_tw.AddTimeWheel(second_tw)
  48. //时间轮运行
  49. second_tw.Run()
  50. minute_tw.Run()
  51. hour_tw.Run()
  52. return &TimerScheduler{
  53. tw: hour_tw,
  54. triggerChan: make(chan *DelayFunc, MAX_CHAN_BUFF),
  55. ids: make([]uint32, 0),
  56. }
  57. }
  58. //创建一个定点Timer 并将Timer添加到分层时间轮中, 返回Timer的tid
  59. func (this *TimerScheduler) CreateTimerAt(df *DelayFunc, unixNano int64) (uint32, error) {
  60. this.Lock()
  61. defer this.Unlock()
  62. this.idGen++
  63. this.ids = append(this.ids, this.idGen)
  64. return this.idGen, this.tw.AddTimer(this.idGen, NewTimerAt(df, unixNano))
  65. }
  66. //创建一个延迟Timer 并将Timer添加到分层时间轮中, 返回Timer的tid
  67. func (this *TimerScheduler) CreateTimerAfter(df *DelayFunc, duration time.Duration) (uint32, error) {
  68. this.Lock()
  69. defer this.Unlock()
  70. this.idGen++
  71. this.ids = append(this.ids, this.idGen)
  72. return this.idGen, this.tw.AddTimer(this.idGen, NewTimerAfter(df, duration))
  73. }
  74. //删除timer
  75. func (this *TimerScheduler) CancelTimer(tid uint32) {
  76. this.Lock()
  77. this.Unlock()
  78. //this.tw.RemoveTimer(tid) 这个方法无效
  79. //删除timerId
  80. var index = 0
  81. for i := 0; i < len(this.ids); i++ {
  82. if this.ids[i] == tid {
  83. index = i
  84. }
  85. }
  86. this.ids = append(this.ids[:index], this.ids[index+1:]...)
  87. }
  88. //获取计时结束的延迟执行函数通道
  89. func (this *TimerScheduler) GetTriggerChan() chan *DelayFunc {
  90. return this.triggerChan
  91. }
  92. func (this *TimerScheduler) HasTimer(tid uint32) bool {
  93. for i := 0; i < len(this.ids); i++ {
  94. if this.ids[i] == tid {
  95. return true
  96. }
  97. }
  98. return false
  99. }
  100. //非阻塞的方式启动timerSchedule
  101. func (this *TimerScheduler) Start() {
  102. go func() {
  103. for {
  104. //当前时间
  105. now := UnixMilli()
  106. //获取最近MAX_TIME_DELAY 毫秒的超时定时器集合
  107. timerList := this.tw.GetTimerWithIn(MAX_TIME_DELAY * time.Millisecond)
  108. for tid, timer := range timerList {
  109. if math.Abs(float64(now-timer.unixts)) > MAX_TIME_DELAY {
  110. //已经超时的定时器,报警
  111. zlog.Error("want call at ", timer.unixts, "; real call at", now, "; delay ", now-timer.unixts)
  112. }
  113. if this.HasTimer(tid) {
  114. //将超时触发函数写入管道
  115. this.triggerChan <- timer.delayFunc
  116. }
  117. }
  118. time.Sleep(MAX_TIME_DELAY / 2 * time.Millisecond)
  119. }
  120. }()
  121. }
  122. //时间轮定时器 自动调度
  123. func NewAutoExecTimerScheduler() *TimerScheduler {
  124. //创建一个调度器
  125. autoExecScheduler := NewTimerScheduler()
  126. //启动调度器
  127. autoExecScheduler.Start()
  128. //永久从调度器中获取超时 触发的函数 并执行
  129. go func() {
  130. delayFuncChan := autoExecScheduler.GetTriggerChan()
  131. for df := range delayFuncChan {
  132. go df.Call()
  133. }
  134. }()
  135. return autoExecScheduler
  136. }