You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

222 lines
6.9 KiB

  1. package ztimer
  2. /**
  3. * @Author: Aceld
  4. * @Date: 2019/4/30 11:57
  5. * @Mail: danbing.at@gmail.com
  6. */
  7. import (
  8. "errors"
  9. "fmt"
  10. "sync"
  11. "time"
  12. "github.com/aceld/zinx/zlog"
  13. )
  14. /*
  15. tips:
  16. 一个网络服务程序时需要管理大量客户端连接的
  17. 其中每个客户端连接都需要管理它的 timeout 时间
  18. 通常连接的超时管理一般设置为30~60秒不等并不需要太精确的时间控制
  19. 另外由于服务端管理着多达数万到数十万不等的连接数
  20. 因此我们没法为每个连接使用一个Timer那样太消耗资源不现实
  21. 用时间轮的方式来管理和维护大量的timer调度会解决上面的问题
  22. */
  23. //TimeWheel 时间轮
  24. type TimeWheel struct {
  25. //TimeWheel的名称
  26. name string
  27. //刻度的时间间隔,单位ms
  28. interval int64
  29. //每个时间轮上的刻度数
  30. scales int
  31. //当前时间指针的指向
  32. curIndex int
  33. //每个刻度所存放的timer定时器的最大容量
  34. maxCap int
  35. //当前时间轮上的所有timer
  36. timerQueue map[int]map[uint32]*Timer //map[int] VALUE 其中int表示当前时间轮的刻度,
  37. // map[int] map[uint32] *Timer, uint32表示Timer的id号
  38. //下一层时间轮
  39. nextTimeWheel *TimeWheel
  40. //互斥锁(继承RWMutex的 RWLock,UnLock 等方法)
  41. sync.RWMutex
  42. }
  43. //NewTimeWheel 创建一个时间轮
  44. func NewTimeWheel(name string, interval int64, scales int, maxCap int) *TimeWheel {
  45. // name:时间轮的名称
  46. // interval:每个刻度之间的duration时间间隔
  47. // scales:当前时间轮的轮盘一共多少个刻度(如我们正常的时钟就是12个刻度)
  48. // maxCap: 每个刻度所最大保存的Timer定时器个数
  49. tw := &TimeWheel{
  50. name: name,
  51. interval: interval,
  52. scales: scales,
  53. maxCap: maxCap,
  54. timerQueue: make(map[int]map[uint32]*Timer, scales),
  55. }
  56. //初始化map
  57. for i := 0; i < scales; i++ {
  58. tw.timerQueue[i] = make(map[uint32]*Timer, maxCap)
  59. }
  60. zlog.Info("Init timerWhell name = ", tw.name, " is Done!")
  61. return tw
  62. }
  63. /*
  64. 将一个timer定时器加入到分层时间轮中
  65. tid: 每个定时器timer的唯一标识
  66. t: 当前被加入时间轮的定时器
  67. forceNext: 是否强制的将定时器添加到下一层时间轮
  68. 我们采用的算法是
  69. 如果当前timer的超时时间间隔 大于一个刻度那么进行hash计算 找到对应的刻度上添加
  70. 如果当前的timer的超时时间间隔 小于一个刻度 :
  71. 如果没有下一轮时间轮
  72. */
  73. func (tw *TimeWheel) addTimer(tid uint32, t *Timer, forceNext bool) error {
  74. defer func() error {
  75. if err := recover(); err != nil {
  76. errstr := fmt.Sprintf("addTimer function err : %s", err)
  77. zlog.Error(errstr)
  78. return errors.New(errstr)
  79. }
  80. return nil
  81. }()
  82. //得到当前的超时时间间隔(ms)毫秒为单位
  83. delayInterval := t.unixts - UnixMilli()
  84. //如果当前的超时时间 大于一个刻度的时间间隔
  85. if delayInterval >= tw.interval {
  86. //得到需要跨越几个刻度
  87. dn := delayInterval / tw.interval
  88. //在对应的刻度上的定时器Timer集合map加入当前定时器(由于是环形,所以要求余)
  89. tw.timerQueue[(tw.curIndex+int(dn))%tw.scales][tid] = t
  90. return nil
  91. }
  92. //如果当前的超时时间,小于一个刻度的时间间隔,并且当前时间轮没有下一层,经度最小的时间轮
  93. if delayInterval < tw.interval && tw.nextTimeWheel == nil {
  94. if forceNext == true {
  95. //如果设置为强制移至下一个刻度,那么将定时器移至下一个刻度
  96. //这种情况,主要是时间轮自动轮转的情况
  97. //因为这是底层时间轮,该定时器在转动的时候,如果没有被调度者取走的话,该定时器将不会再被发现
  98. //因为时间轮刻度已经过去,如果不强制把该定时器Timer移至下时刻,就永远不会被取走并触发调用
  99. //所以这里强制将timer移至下个刻度的集合中,等待调用者在下次轮转之前取走该定时器
  100. tw.timerQueue[(tw.curIndex+1)%tw.scales][tid] = t
  101. } else {
  102. //如果手动添加定时器,那么直接将timer添加到对应底层时间轮的当前刻度集合中
  103. tw.timerQueue[tw.curIndex][tid] = t
  104. }
  105. return nil
  106. }
  107. //如果当前的超时时间,小于一个刻度的时间间隔,并且有下一层时间轮
  108. if delayInterval < tw.interval {
  109. return tw.nextTimeWheel.AddTimer(tid, t)
  110. }
  111. return nil
  112. }
  113. //AddTimer 添加一个timer到一个时间轮中(非时间轮自转情况)
  114. func (tw *TimeWheel) AddTimer(tid uint32, t *Timer) error {
  115. tw.Lock()
  116. defer tw.Unlock()
  117. return tw.addTimer(tid, t, false)
  118. }
  119. //RemoveTimer 删除一个定时器,根据定时器的id
  120. func (tw *TimeWheel) RemoveTimer(tid uint32) {
  121. tw.Lock()
  122. defer tw.Unlock()
  123. for i := 0; i < tw.scales; i++ {
  124. if _, ok := tw.timerQueue[i][tid]; ok {
  125. delete(tw.timerQueue[i], tid)
  126. }
  127. }
  128. }
  129. //AddTimeWheel 给一个时间轮添加下层时间轮 比如给小时时间轮添加分钟时间轮,给分钟时间轮添加秒时间轮
  130. func (tw *TimeWheel) AddTimeWheel(next *TimeWheel) {
  131. tw.nextTimeWheel = next
  132. zlog.Info("Add timerWhell[", tw.name, "]'s next [", next.name, "] is succ!")
  133. }
  134. /*
  135. 启动时间轮
  136. */
  137. func (tw *TimeWheel) run() {
  138. for {
  139. //时间轮每间隔interval一刻度时间,触发转动一次
  140. time.Sleep(time.Duration(tw.interval) * time.Millisecond)
  141. tw.Lock()
  142. //取出挂载在当前刻度的全部定时器
  143. curTimers := tw.timerQueue[tw.curIndex]
  144. //当前定时器要重新添加 所给当前刻度再重新开辟一个map Timer容器
  145. tw.timerQueue[tw.curIndex] = make(map[uint32]*Timer, tw.maxCap)
  146. for tid, timer := range curTimers {
  147. //这里属于时间轮自动转动,forceNext设置为true
  148. tw.addTimer(tid, timer, true)
  149. }
  150. //取出下一个刻度 挂载的全部定时器 进行重新添加 (为了安全起见,待考慮)
  151. nextTimers := tw.timerQueue[(tw.curIndex+1)%tw.scales]
  152. tw.timerQueue[(tw.curIndex+1)%tw.scales] = make(map[uint32]*Timer, tw.maxCap)
  153. for tid, timer := range nextTimers {
  154. tw.addTimer(tid, timer, true)
  155. }
  156. //当前刻度指针 走一格
  157. tw.curIndex = (tw.curIndex + 1) % tw.scales
  158. tw.Unlock()
  159. }
  160. }
  161. //Run 非阻塞的方式让时间轮转起来
  162. func (tw *TimeWheel) Run() {
  163. go tw.run()
  164. zlog.Info("timerwheel name = ", tw.name, " is running...")
  165. }
  166. //GetTimerWithIn 获取定时器在一段时间间隔内的Timer
  167. func (tw *TimeWheel) GetTimerWithIn(duration time.Duration) map[uint32]*Timer {
  168. //最终触发定时器的一定是挂载最底层时间轮上的定时器
  169. //1 找到最底层时间轮
  170. leaftw := tw
  171. for leaftw.nextTimeWheel != nil {
  172. leaftw = leaftw.nextTimeWheel
  173. }
  174. leaftw.Lock()
  175. defer leaftw.Unlock()
  176. //返回的Timer集合
  177. timerList := make(map[uint32]*Timer)
  178. now := UnixMilli()
  179. //取出当前时间轮刻度内全部Timer
  180. for tid, timer := range leaftw.timerQueue[leaftw.curIndex] {
  181. if timer.unixts-now < int64(duration/1e6) {
  182. //当前定时器已经超时
  183. timerList[tid] = timer
  184. //定时器已经超时被取走,从当前时间轮上 摘除该定时器
  185. delete(leaftw.timerQueue[leaftw.curIndex], tid)
  186. }
  187. }
  188. return timerList
  189. }