executors

[!TIP] This document is machine-translated by Google. If you find grammatical and semantic errors, and the document description is not clear, please PR

In go-zero, executors act as a task pool, do multi-task buffering, and use tasks for batch processing. Such as: clickhouse large batch insert, sql batch insert. At the same time, you can also see executors in go-queue [In queue, ChunkExecutor is used to limit the byte size of task submission].

So when you have the following requirements, you can use this component:

  • Submit tasks in batches
  • Buffer part of tasks and submit lazily
  • Delay task submission

Before explaining it in detail, let’s give a rough overview: c42c34e8d33d48ec8a63e56feeae882a

Interface design

Under the executors package, there are the following executors:

Name Margin value
bulkexecutor Reach maxTasks [Maximum number of tasks] Submit
chunkexecutor Reach maxChunkSize[Maximum number of bytes] Submit
periodicalexecutor basic executor
delayexecutor Delay the execution of the passed fn()
lessexecutor

You will see that except for the special functions of delay and less, the other three are all combinations of executor + container:

  1. func NewBulkExecutor(execute Execute, opts ...BulkOption) *BulkExecutor {
  2. // Option mode: It appears in many places in go-zero. In multiple configurations, better design ideas
  3. // https://halls-of-valhalla.org/beta/articles/functional-options-pattern-in-go,54/
  4. options := newBulkOptions()
  5. for _, opt := range opts {
  6. opt(&options)
  7. }
  8. // 1. task container: [execute the function that actually does the execution] [maxTasks execution critical point]
  9. container := &bulkContainer{
  10. execute: execute,
  11. maxTasks: options.cachedTasks,
  12. }
  13. // 2. It can be seen that the underlying bulkexecutor depends on the periodicalexecutor
  14. executor := &BulkExecutor{
  15. executor: NewPeriodicalExecutor(options.flushInterval, container),
  16. container: container,
  17. }
  18. return executor
  19. }

And this container is an interface:

  1. TaskContainer interface {
  2. // Add task to container
  3. AddTask(task interface{}) bool
  4. // Is actually to execute the incoming execute func()
  5. Execute(tasks interface{})
  6. // When the critical value is reached, remove all tasks in the container and pass them to execute func() through the channel for execution
  7. RemoveAll() interface{}
  8. }

This shows the dependency between:

  • bulkexecutorperiodicalexecutor + bulkContainer
  • chunkexecutorperiodicalexecutor + chunkContainer

[!TIP] So if you want to complete your own executor, you can implement these three interfaces of container, and then combine with periodicalexecutor.

So back to the picture 👆, our focus is on the periodicalexecutor, and see how it is designed?

How to use

First look at how to use this component in business:

There is a timed service to perform data synchronization from mysql to clickhouse at a fixed time every day:

  1. type DailyTask struct {
  2. ckGroup *clickhousex.Cluster
  3. insertExecutor *executors.BulkExecutor
  4. mysqlConn sqlx.SqlConn
  5. }

Initialize bulkExecutor:

  1. func (dts *DailyTask) Init() {
  2. // insertIntoCk() is the real insert execution function [requires developers to write specific business logic by themselves]
  3. dts.insertExecutor = executors.NewBulkExecutor(
  4. dts.insertIntoCk,
  5. executors.WithBulkInterval(time.Second*3), // The container will automatically refresh the task to execute every 3s.
  6. executors.WithBulkTasks(10240), // The maximum number of tasks for the container. Generally set to a power of 2
  7. )
  8. }

[!TIP] An additional introduction: clickhouse is suitable for mass insertion, because the insert speed is very fast, mass insert can make full use of clickhouse

Main business logic preparation:

  1. func (dts *DailyTask) insertNewData(ch chan interface{}, sqlFromDb *model.Task) error {
  2. for item := range ch {
  3. if r, vok := item.(*model.Task); !vok {
  4. continue
  5. }
  6. err := dts.insertExecutor.Add(r)
  7. if err != nil {
  8. r.Tag = sqlFromDb.Tag
  9. r.TagId = sqlFromDb.Id
  10. r.InsertId = genInsertId()
  11. r.ToRedis = toRedis == constant.INCACHED
  12. r.UpdateWay = sqlFromDb.UpdateWay
  13. // 1. Add Task
  14. err := dts.insertExecutor.Add(r)
  15. if err != nil {
  16. logx.Error(err)
  17. }
  18. }
  19. }
  20. // 2. Flush Task container
  21. dts.insertExecutor.Flush()
  22. // 3. Wait All Task Finish
  23. dts.insertExecutor.Wait()
  24. }

[!TIP] You may be wondering why Flush(), Wait() is needed, and I will analyze it through the source code later.

There are 3 steps to use as a whole:

  • Add(): Add to task
  • Flush(): Refresh tasks in container
  • Wait(): Wait for the completion of all tasks

Source code analysis

[!TIP] The main analysis here is periodicalexecutor, because the other two commonly used executors rely on it.

Initialization

  1. func New...(interval time.Duration, container TaskContainer) *PeriodicalExecutor {
  2. executor := &PeriodicalExecutor{
  3. commander: make(chan interface{}, 1),
  4. interval: interval,
  5. container: container,
  6. confirmChan: make(chan lang.PlaceholderType),
  7. newTicker: func(d time.Duration) timex.Ticker {
  8. return timex.NewTicker(interval)
  9. },
  10. }
  11. ...
  12. return executor
  13. }
  • commander: Pass the channel of tasks
  • container: Temporarily store the task of Add()
  • confirmChan: Block Add(), at the beginning of this time, executeTasks() will let go of blocking
  • ticker: To prevent the blocking of Add(), there will be a chance to execute regularly and release the temporarily stored task in time

Add()

After initialization, the first step in the business logic is to add task to executor:

  1. func (pe *PeriodicalExecutor) Add(task interface{}) {
  2. if vals, ok := pe.addAndCheck(task); ok {
  3. pe.commander <- vals
  4. <-pe.confirmChan
  5. }
  6. }
  7. func (pe *PeriodicalExecutor) addAndCheck(task interface{}) (interface{}, bool) {
  8. pe.lock.Lock()
  9. defer func() {
  10. // default false
  11. var start bool
  12. if !pe.guarded {
  13. // backgroundFlush() will reset guarded
  14. pe.guarded = true
  15. start = true
  16. }
  17. pe.lock.Unlock()
  18. // The backgroundFlush() in if will be executed when the first task is added. Background coroutine brush task
  19. if start {
  20. pe.backgroundFlush()
  21. }
  22. }()
  23. // Control maxTask, >=maxTask will pop and return tasks in the container
  24. if pe.container.AddTask(task) {
  25. return pe.container.RemoveAll(), true
  26. }
  27. return nil, false
  28. }

In addAndCheck(), AddTask() is controlling the maximum number of tasks. If it exceeds the number of tasks, RemoveAll() will be executed, and the tasks pop of the temporarily stored container will be passed to the commander, followed by goroutine loop reading , And then execute tasks.

backgroundFlush()

Start a background coroutine, and constantly refresh the tasks in the container:

  1. func (pe *PeriodicalExecutor) backgroundFlush() {
  2. // Encapsulate go func(){}
  3. threading.GoSafe(func() {
  4. ticker := pe.newTicker(pe.interval)
  5. defer ticker.Stop()
  6. var commanded bool
  7. last := timex.Now()
  8. for {
  9. select {
  10. // Get []tasks from channel
  11. case vals := <-pe.commander:
  12. commanded = true
  13. // Substance: wg.Add(1)
  14. pe.enterExecution()
  15. // Let go of the blocking of Add(), and the temporary storage area is also empty at this time. Just start a new task to join
  16. pe.confirmChan <- lang.Placeholder
  17. // Really execute task logic
  18. pe.executeTasks(vals)
  19. last = timex.Now()
  20. case <-ticker.Chan():
  21. if commanded {
  22. // Due to the randomness of select, if the two conditions are met at the same time and the above is executed at the same time, this treatment is reversed and this paragraph is skipped.
  23. // https://draveness.me/golang/docs/part2-foundation/ch05-keyword/golang-select/
  24. commanded = false
  25. } else if pe.Flush() {
  26. // The refresh is complete and the timer is cleared. The temporary storage area is empty, start the next timed refresh
  27. last = timex.Now()
  28. } else if timex.Since(last) > pe.interval*idleRound {
  29. // If maxTask is not reached, Flush() err, and last->now is too long, Flush() will be triggered again
  30. // Only when this is reversed will a new backgroundFlush() background coroutine be opened
  31. pe.guarded = false
  32. // Refresh again to prevent missing
  33. pe.Flush()
  34. return
  35. }
  36. }
  37. }
  38. })
  39. }

Overall two processes:

  • commander receives the tasks passed by RemoveAll(), then executes it, and releases the blocking of Add() to continue Add()
  • It’s time for ticker, if the first step is not executed, it will automatically Flush() and execute the task.

Wait()

In backgroundFlush(), a function is mentioned: enterExecution():

  1. func (pe *PeriodicalExecutor) enterExecution() {
  2. pe.wgBarrier.Guard(func() {
  3. pe.waitGroup.Add(1)
  4. })
  5. }
  6. func (pe *PeriodicalExecutor) Wait() {
  7. pe.wgBarrier.Guard(func() {
  8. pe.waitGroup.Wait()
  9. })
  10. }

By enumerating in this way, you can know why you have to bring dts.insertExecutor.Wait() at the end. Of course, you have to wait for all goroutine tasks to complete.

Thinking

In looking at the source code, I thought about some other design ideas, do you have similar questions:

  • In the analysis of executors, you will find that there are lock in many places

[!TIP] There is a race condition in go test, use locking to avoid this situation

  • After analyzing confirmChan, it was found that this submit only appeared, why is it designed like this?

It used to be: wg.Add(1) was written in executeTasks(); now it is: first wg.Add(1), then release confirmChan blocking If the execution of executor func is blocked, Add task is still in progress, because there is no block, it may be executed to Executor.Wait() soon, and this is where wg.Wait() appears in wg.Add () before execution, this will be panic

For details, please see the latest version of TestPeriodicalExecutor_WaitFast(), you may wish to run on this version to reproduce.

Summary

There are still a few more analysis of executors, I leave it to you to look at the source code.

In short, the overall design:

  • Follow interface-oriented design
  • Flexible use of concurrent tools such as channel and waitgroup
  • The combination of execution unit + storage unit

There are many useful component tools in go-zero. Good use of tools is very helpful to improve service performance and development efficiency. I hope this article can bring you some gains.