TimingWheel

[!TIP] This document is machine-translated by Google. If you find grammatical and semantic errors, and the document description is not clear, please PR

This article introduces the delayed operation in go-zero. Delayed operation, two options can be used:

  1. Timer: The timer maintains a priority queue, executes it at the time, and then stores the tasks that need to be executed in the map
  2. The timingWheel in collection maintains an array for storing task groups, and each slot maintains a doubly linked list of tasks. When the execution starts, the timer executes the tasks in a slot every specified time.

Scheme 2 reduces the maintenance task from the priority queue O(nlog(n)) to the doubly linked list O(1), and the execution of the task only needs to poll the tasks O(N) at a time point. Priority queue, put and delete elements O(nlog(n)).

timingWheel in cache

First, let’s first talk about the use of TimingWheel in the cache of collection:

  1. timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v interface{}) {
  2. key, ok := k.(string)
  3. if !ok {
  4. return
  5. }
  6. cache.Del(key)
  7. })
  8. if err != nil {
  9. return nil, err
  10. }
  11. cache.timingWheel = timingWheel

This is the initialization of cache and the initialization of timingWheel at the same time for key expiration processing. The parameters in turn represent:

  • interval: Time division scale
  • numSlots: time slots
  • execute: execute a function at a point in time

The function executed in the cache is to delete the expired key, and this expiration is controlled by the timingWheel to advance the time.

Next, let’s learn about it through the use of timingWheel by cache.

Initialization

  1. func newTimingWheelWithClock(interval time.Duration, numSlots int, execute Execute, ticker timex.Ticker) (
  2. *TimingWheel, error) {
  3. tw := &TimingWheel{
  4. interval: interval, // Single time grid time interval
  5. ticker: ticker, // Timer, do time push, advance by interval
  6. slots: make([]*list.List, numSlots), // Time wheel
  7. timers: NewSafeMap(), // Store the map of task{key, value} [parameters needed to execute execute]
  8. tickedPos: numSlots - 1, // at previous virtual circle
  9. execute: execute, // Execution function
  10. numSlots: numSlots, // Initialize slots num
  11. setChannel: make(chan timingEntry), // The following channels are used for task delivery
  12. moveChannel: make(chan baseEntry),
  13. removeChannel: make(chan interface{}),
  14. drainChannel: make(chan func(key, value interface{})),
  15. stopChannel: make(chan lang.PlaceholderType),
  16. }
  17. // Prepare all the lists stored in the slot
  18. tw.initSlots()
  19. // Open asynchronous coroutine, use channel for task communication and delivery
  20. go tw.run()
  21. return tw, nil
  22. }

76108cc071154e2faa66eada81857fb0~tplv-k3u1fbpfcp-zoom-1.image.png

The above is a more intuitive display of the “time wheel” of the timingWheel, and the details of the advancement will be explained later around this picture.

go tw.run() opens a coroutine for time promotion:

  1. func (tw *TimingWheel) run() {
  2. for {
  3. select {
  4. // Timer do time push -> scanAndRunTasks()
  5. case <-tw.ticker.Chan():
  6. tw.onTick()
  7. // add task will enter task into setChannel
  8. case task := <-tw.setChannel:
  9. tw.setTask(&task)
  10. ...
  11. }
  12. }
  13. }

It can be seen that the timer execution is started at the time of initialization, and it is rotated in the internal time period, and then the bottom layer keeps getting the tasks from the list in the slot and handing them over to the execute for execution.

3bbddc1ebb79455da91dfcf3da6bc72f~tplv-k3u1fbpfcp-zoom-1.image.png

Task Operation

The next step is to set the cache key:

  1. func (c *Cache) Set(key string, value interface{}) {
  2. c.lock.Lock()
  3. _, ok := c.data[key]
  4. c.data[key] = value
  5. c.lruCache.add(key)
  6. c.lock.Unlock()
  7. expiry := c.unstableExpiry.AroundDuration(c.expire)
  8. if ok {
  9. c.timingWheel.MoveTimer(key, expiry)
  10. } else {
  11. c.timingWheel.SetTimer(key, value, expiry)
  12. }
  13. }
  1. First look at whether this key exists in the data map
  2. If it exists, update expire -> MoveTimer()
  3. Set the key for the first time -> SetTimer()

So the use of timingWheel is clear. Developers can add or update according to their needs.

At the same time, when we follow the source code, we will find that: SetTimer() MoveTimer() all transports tasks to channel, and the coroutine opened in run() continuously takes out the task operations of channel.

SetTimer() -> setTask()

  • not exist task:getPostion -> pushBack to list -> setPosition
  • exist task:get from timers -> moveTask()

MoveTimer() -> moveTask()

From the above call chain, there is a function that will be called: moveTask()

  1. func (tw *TimingWheel) moveTask(task baseEntry) {
  2. // timers: Map => Get [positionEntry「pos, task」] by key
  3. val, ok := tw.timers.Get(task.key)
  4. if !ok {
  5. return
  6. }
  7. timer := val.(*positionEntry)
  8. // {delay <interval} => The delay time is less than a time grid interval, and there is no smaller scale, indicating that the task should be executed immediately
  9. if task.delay < tw.interval {
  10. threading.GoSafe(func() {
  11. tw.execute(timer.item.key, timer.item.value)
  12. })
  13. return
  14. }
  15. // If> interval, the new pos, circle in the time wheel is calculated by the delay time delay
  16. pos, circle := tw.getPositionAndCircle(task.delay)
  17. if pos >= timer.pos {
  18. timer.item.circle = circle
  19. // Move offset before and after recording. To re-enter the team for later process
  20. timer.item.diff = pos - timer.pos
  21. } else if circle > 0 {
  22. // Move to the next layer and convert circle to part of diff
  23. circle--
  24. timer.item.circle = circle
  25. // Because it is an array, add numSlots [that is equivalent to going to the next level]
  26. timer.item.diff = tw.numSlots + pos - timer.pos
  27. } else {
  28. // If the offset is advanced, the task is still in the first layer at this time
  29. // Mark to delete the old task, and re-enter the team, waiting to be executed
  30. timer.item.removed = true
  31. newItem := &timingEntry{
  32. baseEntry: task,
  33. value: timer.item.value,
  34. }
  35. tw.slots[pos].PushBack(newItem)
  36. tw.setTimerPosition(pos, newItem)
  37. }
  38. }

The above process has the following situations:

  • delay <internal: Because <single time precision, it means that this task has expired and needs to be executed immediately
  • For changed delay:
    • new >= old<newPos, newCircle, diff>
    • newCircle> 0: Calculate diff and convert circle to the next layer, so diff + numslots
    • If only the delay time is shortened, delete the old task mark, re-add it to the list, and wait for the next loop to be executed

Execute

In the previous initialization, the timer in run() kept advancing, and the process of advancing was mainly to pass the tasks in the list to the executed execute func. Let’s start with the execution of the timer:

  1. // Timer "It will be executed every internal"
  2. func (tw *TimingWheel) onTick() {
  3. // Update the current execution tick position every time it is executed
  4. tw.tickedPos = (tw.tickedPos + 1) % tw.numSlots
  5. // Get the doubly linked list of stored tasks in the tick position at this time
  6. l := tw.slots[tw.tickedPos]
  7. tw.scanAndRunTasks(l)
  8. }

Next is how to execute execute:

  1. func (tw *TimingWheel) scanAndRunTasks(l *list.List) {
  2. // Store the task{key, value} that needs to be executed at present [parameters required by execute, which are passed to execute in turn]
  3. var tasks []timingTask
  4. for e := l.Front(); e != nil; {
  5. task := e.Value.(*timingEntry)
  6. // Mark the deletion, do the real deletion in scan "Delete the map data"
  7. if task.removed {
  8. next := e.Next()
  9. l.Remove(e)
  10. tw.timers.Del(task.key)
  11. e = next
  12. continue
  13. } else if task.circle > 0 {
  14. // The current execution point has expired, but it is not at the first level at the same time, so now that the current level has been completed, it will drop to the next level
  15. // But did not modify pos
  16. task.circle--
  17. e = e.Next()
  18. continue
  19. } else if task.diff > 0 {
  20. // Because the diff has been marked before, you need to enter the queue again
  21. next := e.Next()
  22. l.Remove(e)
  23. pos := (tw.tickedPos + task.diff) % tw.numSlots
  24. tw.slots[pos].PushBack(task)
  25. tw.setTimerPosition(pos, task)
  26. task.diff = 0
  27. e = next
  28. continue
  29. }
  30. // The above cases are all cases that cannot be executed, and those that can be executed will be added to tasks
  31. tasks = append(tasks, timingTask{
  32. key: task.key,
  33. value: task.value,
  34. })
  35. next := e.Next()
  36. l.Remove(e)
  37. tw.timers.Del(task.key)
  38. e = next
  39. }
  40. // for range tasks, and then execute each task->execute
  41. tw.runTasks(tasks)
  42. }

The specific branching situation is explained in the comments. When you look at it, it can be combined with the previous moveTask(), where the circle drops, and the calculation of diff is the key to linking the two functions.

As for the calculation of diff, the calculation of pos, circle is involved:

  1. // interval: 4min, d: 60min, numSlots: 16, tickedPos = 15
  2. // step = 15, pos = 14, circle = 0
  3. func (tw *TimingWheel) getPositionAndCircle(d time.Duration) (pos int, circle int) {
  4. steps := int(d / tw.interval)
  5. pos = (tw.tickedPos + steps) % tw.numSlots
  6. circle = (steps - 1) / tw.numSlots
  7. return
  8. }

The above process can be simplified to the following:

  1. steps = d / interval
  2. pos = step % numSlots - 1
  3. circle = (step - 1) / numSlots

Summary

The timingWheel is driven by the timer. As the time advances, the tasks of the list “doubly linked list” in the current time grid will be taken out and passed to the execute for execution.

In terms of time separation, the time wheel has circle layers, so that the original numSlots can be reused continuously, because the timer is constantly loop, and execution can drop the upper layer slot to the lower layer. You can execute the task up to the upper level continuously in the loop.

There are many useful component tools in go-zero. Good use of tools is of great help to improve service performance and development efficiency. I hope this article can bring you some gains.