Batch Processor

The batch processor can be used to aggregate entries(logs/any data) and process them in a batch. When the batch_max_size is set to 1 the processor will execute each entry immediately. Setting the batch max size more than 1 will start aggregating the entries until it reaches the max size or the timeout expires.

Configurations

The only mandatory parameter to create a batch processor is a function. The function will be executed when the batch reaches the max size or when the buffer duration exceeds.

NameTypeRequirementDefaultValidDescription
namestringoptionallogger’s name[“http logger”,…]A unique identifier used to identify the batch processor, which defaults to the name of the logger plug-in that calls the batch processor, such as plug-in “http logger” ‘s name is “http logger.
batch_max_sizeintegeroptional1000[1,…]Sets the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the HTTP/HTTPS service.
inactive_timeoutintegeroptional5[1,…]The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the HTTP/HTTPS service regardless of whether the number of logs in the buffer reaches the maximum number set.
buffer_durationintegeroptional60[1,…]Maximum age in seconds of the oldest entry in a batch before the batch must be processed.
max_retry_countintegeroptional0[0,…]Maximum number of retries before removing the entry from the processing pipeline when an error occurs.
retry_delayintegeroptional1[0,…]Number of seconds the process execution should be delayed if the execution fails.

The following code shows an example of how to use batch processor in your plugin:

  1. local bp_manager_mod = require("apisix.utils.batch-processor-manager")
  2. ...
  3. local plugin_name = "xxx-logger"
  4. local batch_processor_manager = bp_manager_mod.new(plugin_name)
  5. local schema = {...}
  6. local _M = {
  7. ...
  8. name = plugin_name,
  9. schema = batch_processor_manager:wrap_schema(schema),
  10. }
  11. ...
  12. function _M.log(conf, ctx)
  13. local entry = {...} -- data to log
  14. if batch_processor_manager:add_entry(conf, entry) then
  15. return
  16. end
  17. -- create a new processor if not found
  18. -- entries is an array table of entry, which can be processed in batch
  19. local func = function(entries)
  20. -- serialize to json array core.json.encode(entries)
  21. -- process/send data
  22. return true
  23. -- return false, err_msg, first_fail if failed
  24. -- first_fail(optional) indicates first_fail-1 entries have been successfully processed
  25. -- and during processing of entries[first_fail], the error occurred. So the batch processor
  26. -- only retries for the entries having index >= first_fail as per the retry policy.
  27. end
  28. batch_processor_manager:add_entry_to_new_processor(conf, entry, ctx, func)
  29. end

The batch processor’s configuration will be set inside the plugin’s configuration. For example:

  1. curl http://127.0.0.1:9180/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
  2. {
  3. "plugins": {
  4. "http-logger": {
  5. "uri": "http://mockbin.org/bin/:ID",
  6. "batch_max_size": 10,
  7. "max_retry_count": 1
  8. }
  9. },
  10. "upstream": {
  11. "type": "roundrobin",
  12. "nodes": {
  13. "127.0.0.1:1980": 1
  14. }
  15. },
  16. "uri": "/hello"
  17. }'

If your plugin only uses one global batch processor, you can also use the processor directly:

  1. local entry = {...} -- data to log
  2. if log_buffer then
  3. log_buffer:push(entry)
  4. return
  5. end
  6. local config_bat = {
  7. name = config.name,
  8. retry_delay = config.retry_delay,
  9. ...
  10. }
  11. local err
  12. -- entries is an array table of entry, which can be processed in batch
  13. local func = function(entries)
  14. ...
  15. return true
  16. -- return false, err_msg, first_fail if failed
  17. end
  18. log_buffer, err = batch_processor:new(func, config_bat)
  19. if not log_buffer then
  20. core.log.warn("error when creating the batch processor: ", err)
  21. return
  22. end
  23. log_buffer:push(entry)

Note: Please make sure the batch max size (entry count) is within the limits of the function execution. The timer to flush the batch runs based on the inactive_timeout configuration. Thus, for optimal usage, keep the inactive_timeout smaller than the buffer_duration.