Get started with InfluxDB tasks

An InfluxDB task is a scheduled Flux script that takes a stream of input data, modifies or analyzes it in some way, then stores the modified data in a new bucket or performs other actions.

This article walks through writing a basic InfluxDB task that downsamples data and stores it in a new bucket.

Components of a task

Every InfluxDB task needs the following four components. Their form and order can vary, but they are all essential parts of a task.

Skip to the full example task script

Define task options

Task options define specific information about the task. The example below illustrates how task options are defined in your Flux script:

  1. option task = {
  2. name: "cqinterval15m",
  3. every: 1h,
  4. offset: 0m,
  5. concurrency: 1,
  6. retry: 5
  7. }

See Task configuration options for detailed information about each option.

When creating a task in the InfluxDB user interface (UI), task options are defined in form fields.

Define a data source

Define a data source using Flux’s from() function or any other Flux input functions.

For convenience, consider creating a variable that includes the sourced data with the required time range and any relevant filters.

  1. data = from(bucket: "telegraf/default")
  2. |> range(start: -task.every)
  3. |> filter(fn: (r) =>
  4. r._measurement == "mem" and
  5. r.host == "myHost"
  6. )

Using task options in your Flux script

Task options are passed as part of a task option record and can be referenced in your Flux script. In the example above, the time range is defined as -task.every.

task.every is dot notation that references the every property of the task option record. every is defined as 1h, therefore -task.every equates to -1h.

Using task options to define values in your Flux script can make reusing your task easier.

Process or transform your data

The purpose of tasks is to process or transform data in some way. What exactly happens and what form the output data takes is up to you and your specific use case.

Account for latent data with an offset

To account for latent data (like data streaming from your edge devices), use an offset in your task. For example, if you set a task interval on the hour with the options every: 1h and offset: 5m, a task executes 5 minutes after the task interval but the query now() time is on the exact hour.

The example below illustrates a task that downsamples data by calculating the average of set intervals. It uses the data variable defined above as the data source. It then windows the data into 5 minute intervals and calculates the average of each window using the aggregateWindow() function.

  1. data
  2. |> aggregateWindow(
  3. every: 5m,
  4. fn: mean
  5. )

See Common tasks for examples of tasks commonly used with InfluxDB.

Define a destination

In the vast majority of task use cases, once data is transformed, it needs to be sent and stored somewhere. This could be a separate bucket or another measurement.

The example below uses Flux’s to() function to send the transformed data to another bucket:

  1. // ...
  2. |> to(bucket: "telegraf_downsampled", org: "my-org")

In order to write data into InfluxDB, you must have _time, _measurement, _field, and _value columns.

Full example task script

Below is a task script that combines all of the components described above:

  1. // Task options
  2. option task = {
  3. name: "cqinterval15m",
  4. every: 1h,
  5. offset: 0m,
  6. concurrency: 1,
  7. retry: 5
  8. }
  9. // Data source
  10. data = from(bucket: "telegraf/default")
  11. |> range(start: -task.every)
  12. |> filter(fn: (r) =>
  13. r._measurement == "mem" and
  14. r.host == "myHost"
  15. )
  16. data
  17. // Data transformation
  18. |> aggregateWindow(
  19. every: 5m,
  20. fn: mean
  21. )
  22. // Data destination
  23. |> to(bucket: "telegraf_downsampled")

tasks