influxdb-client-r

CircleCI codecov

This repository contains R package for InfluxDB 2.0 client.

Features

The InfluxDB 2.0 client supports:

  • Querying data
  • Writing data
  • Getting status

Documentation

This section contains links to the client library documentation.

Installing

The package requires R >= 3.3.

Installing dependencies

  1. install.packages(c("httr", "bit64", "nanotime", "plyr"))

Installing influxdbclient package

The package is published on CRAN and can be installed with

  1. install.packages("influxdbclient")

The latest development version can be installed with

  1. # install.packages("remotes")
  2. remotes::install_github("influxdata/influxdb-client-r")

Usage

Client instantiation

  1. library(influxdbclient)
  2. client <- InfluxDBClient$new(url = "http://localhost:8086",
  3. token = "my-token",
  4. org = "my-org")

Parameters

ParameterDescriptionTypeDefault
urlInfluxDB instance URLcharacternone
tokenauthentication tokencharacternone
orgorganization namecharacternone

Hint: to avoid SSL certificate validation errors when accessing InfluxDB instance over https such as SSL certificate problem: unable to get local issuer certificate, you can try to disable the validation using the following call before using any InfluxDBClient method. Warning: it will disable peer certificate validation for the current R session.

  1. library(httr)
  2. httr::set_config(config(ssl_verifypeer = FALSE))

Querying data

Use query method.

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org")
  4. data <- client$query('from(bucket: "my-bucket") |> range(start: -1h) |> drop(columns: ["_start", "_stop"])')
  5. data

Flux query can yield multiple results in one response, where each result may contain multiple tables.
Return value is therefore a named list, where each element is a list of data frames that represent a result. Data frame represents Flux table. You can list the results using names method.

Quite often, though, there is just a single result and therefore the query by default flattens the return value to simple unnamed list of data frames. This behaviour controlled by flatSingleResult parameter. With flatSingleResult = FALSE, you can check that the return value contains one element with name "_result" (default result name when there is no explicit yield in the query) and use the name to retrieve it, like

  1. > names(data)
  2. [1] "_result"
  3. > data[["_result"]]
  4. [[1]]
  5. time name region sensor_id altitude grounded temperature
  6. 1 2021-06-09T09:52:41+00:00 airSensors south TLM0101 549 FALSE 71.7844100
  7. 2 2021-06-09T09:52:51+00:00 airSensors south TLM0101 547 FALSE 71.7684399
  8. 3 2021-06-09T09:53:01+00:00 airSensors south TLM0101 563 TRUE 71.7819928
  9. 4 2021-06-09T09:53:11+00:00 airSensors south TLM0101 560 TRUE 71.7487767
  10. 5 2021-06-09T09:53:21+00:00 airSensors south TLM0101 544 FALSE 71.7335579

Parameters

ParameterDescriptionTypeDefault
textFlux querycharacternone
POSIXctColFlux time to POSIXct column mappingnamed listc(“_time”=”time”)
flatSingleResultWhether to return simple list when response contains only one resultlogicalTRUE

Incoming type mapping

Flux typeR type
stringcharacter
intinteger64
floatnumeric
boollogical
timenanotime

Using retrieved data as time series

Flux timestamps are parsed into nanotime (integer64 underneath) type, because R datetime types do not support nanosecond precision. nanotime is not a time-based object appropriate for creating a time series, though. By default, query coerces the _time column to time column of POSIXct type (see POSIXctCol parameter), with possible loss precision (which is unimportant in the context of R time series).

Select data of interest from the result like

  1. # from the first data frame, pick subset containing `time` and `_value` columns only
  2. df1 <- data[[1]][c("time", "_value")]

Then, a time series object can be created from the data frame, eg. using tsbox package:

  1. ts1 <- ts_ts(ts_df(df1))

A data frame, or a time series object created from it, can be used for decomposition, anomaly detection etc, like

  1. df1$`_value` %>% ts(freq=168) %>% stl(s.window=13) %>% autoplot()

or

  1. ts1 %>% ts(freq=168) %>% stl(s.window=13) %>% autoplot()

Writing data

Use write method.

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org")
  4. data <- ...
  5. response <- client$write(data, bucket = "my-bucket", precision = "us",
  6. measurementCol = "name",
  7. tagCols = c("region", "sensor_id"),
  8. fieldCols = c("altitude", "temperature"),
  9. timeCol = "time")

The example is valid for data.frame data like the following:

  1. > print(data)
  2. time name region sensor_id altitude grounded temperature
  3. 1 2021-06-09T09:52:41+00:00 airSensors south TLM0101 549 FALSE 71.7844100
  4. 2 2021-06-09T09:52:51+00:00 airSensors south TLM0101 547 FALSE 71.7684399
  5. 3 2021-06-09T09:53:01+00:00 airSensors south TLM0101 563 TRUE 71.7819928
  6. 4 2021-06-09T09:53:11+00:00 airSensors south TLM0101 560 TRUE 71.7487767
  7. 5 2021-06-09T09:53:21+00:00 airSensors south TLM0101 544 FALSE 71.7335579
  8. > str(data)
  9. 'data.frame': 5 obs. of 7 variables:
  10. $ time :integer64 1623232361000000000 1623232371000000000 1623232381000000000 1623232391000000000 1623232401000000000
  11. $ name : chr "airSensors" "airSensors" "airSensors" "airSensors" ...
  12. $ region : chr "south" "south" "south" "south" ...
  13. $ sensor_id : chr "TLM0101" "TLM0101" "TLM0101" "TLM0101" ...
  14. $ altitude :integer64 549 547 563 560 544
  15. $ grounded : logi FALSE FALSE TRUE TRUE FALSE
  16. $ temperature: num 71.8 71.8 71.8 71.7 71.7

Parameters

ParameterDescriptionTypeDefault
xdatadata.frame (or list of)none
buckettarget bucket namecharacternone
batchSizebatch sizenumeric5000
precisiontimestamp precisioncharacter (one of s, ms, us, ns)none
measurementColmeasurement column namecharacter“_measurement”
tagColstags column namescharacterNULL
fieldColsfields column namescharacterc(“_field”=”_value”)
timeColtime column namecharacter“_time”
objectoutput objectcharacterNULL

Supported time column value types: nanotime, POSIXct.

Response is either NULL on success, or errorr otherwise.

Note: default fieldCols value is suitable for writing back unpivoted data retrieved from InfluxDB before. For usual tables (“pivoted” in Flux world), fieldCols should be unnamed list, eg. c("humidity", "temperature", ...).

Outgoing type mapping

R typeInfluxDB type
characterstring
integer, integer64int
numericfloat
logicalbool
nanotime, POSIXcttime

Output preview

To preview how input data are serialized to InfluxDB line protocol, pass the name of object to receive the output as object parameter value.
It changes write to dry-run operation (nothing is sent to the database). The object will be assigned to the calling environment.
This option is intended for debugging purposes.

  1. data <- ...
  2. response <- client$write(data, bucket = "my-bucket", precision = "us",
  3. measurementCol = "name",
  4. tagCols = c("region", "sensor_id"),
  5. fieldCols = c("altitude", "temperature"),
  6. timeCol = "time",
  7. object = "lp")
  8. lp

Sample output:

  1. > print(lp)
  2. [[1]]
  3. [1] "airSensors,region=south,sensor_id=TLM0101 altitude=549i,temperature=71.7844100 1623232361000000"
  4. [2] "airSensors,region=south,sensor_id=TLM0101 altitude=547i,temperature=71.7684399 1623232371000000"
  5. [3] "airSensors,region=south,sensor_id=TLM0101 altitude=563i,temperature=71.7819928 1623232381000000"
  6. [4] "airSensors,region=south,sensor_id=TLM0101 altitude=560i,temperature=71.7487767 1623232391000000"
  7. [5] "airSensors,region=south,sensor_id=TLM0101 altitude=544i,temperature=71.7335579 1623232401000000"

Write retrying

By default, client will not retry failed writes. To instantiate a client with retry support, pass an instance of RetryOptions, eg:

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org",
  4. retryOptions = RetryOptions$new(maxAttempts = 3))

For retry strategy with default options just pass TRUE as retryOptions parameter value:

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org",
  4. retryOptions = TRUE)

Retryable InfluxDB write errors are 429 and 503 status codes. The retry strategy implements exponential backoff algorithm, customizable with RetryOptions.

Getting status

Health status

Use health method to get the health status.

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org")
  4. check <- client$health()

Response is list with health information elements (name, status, version, commit) or error.

Readiness status

Use ready method to get the readiness status.

  1. client <- InfluxDBClient$new(url = "http://localhost:8086",
  2. token = "my-token",
  3. org = "my-org")
  4. check <- client$ready()

Response is a list with status elements (status, started, up) or error.

Advanced

The client automatically follows HTTP redirects.

To use the client with proxy, use set_config to configure the proxy:

  1. library(httr)
  2. httr::set_config(
  3. use_proxy(url = "my-proxy", port = 8080, username = "user",password = "password")
  4. )

Known Issues

Contributing

Contributions are most welcome. The fastest way to get something fixed is to open a PR.

License

The client is available as open source under the terms of the MIT License.