influxdb-client-kotlin

KDoc

The reference Kotlin client that allows query and write for the InfluxDB 2.x by Kotlin Channel coroutines.

Documentation

This section contains links to the client library documentation.

Features

Queries

The QueryKotlinApi supports asynchronous queries by Kotlin Channel coroutines.

The following example demonstrates querying using the Flux language:

  1. package example
  2. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
  3. import kotlinx.coroutines.channels.consumeEach
  4. import kotlinx.coroutines.channels.filter
  5. import kotlinx.coroutines.channels.take
  6. import kotlinx.coroutines.runBlocking
  7. fun main(args: Array<String>) = runBlocking {
  8. val influxDBClient = InfluxDBClientKotlinFactory
  9. .create("http://localhost:8086", "my-token".toCharArray(), "my-org")
  10. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
  11. + " |> range(start: -1d)"
  12. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))")
  13. //Result is returned as a stream
  14. val results = influxDBClient.getQueryKotlinApi().query(fluxQuery)
  15. //Example of additional result stream processing on client side
  16. results
  17. //filter on client side using `filter` built-in operator
  18. .filter { "cpu0" == it.getValueByKey("cpu") }
  19. //take first 20 records
  20. .take(20)
  21. //print results
  22. .consumeEach { println("Measurement: ${it.measurement}, value: ${it.value}") }
  23. influxDBClient.close()
  24. }

It is possible to parse a result line-by-line using the queryRaw method:

  1. package example
  2. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
  3. import kotlinx.coroutines.channels.consumeEach
  4. import kotlinx.coroutines.runBlocking
  5. fun main(args: Array<String>) = runBlocking {
  6. val influxDBClient = InfluxDBClientKotlinFactory
  7. .create("http://localhost:8086", "my-token".toCharArray(), "my-org")
  8. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
  9. + " |> range(start: -5m)"
  10. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))"
  11. + " |> sample(n: 5, pos: 1)")
  12. //Result is returned as a stream
  13. val results = influxDBClient.getQueryKotlinApi().queryRaw(fluxQuery)
  14. //print results
  15. results.consumeEach { println("Line: $it") }
  16. influxDBClient.close()
  17. }

Writes

The WriteKotlinApi supports ingest data by:

  • DataPoint
  • LineProtocol
  • Data class
  • List of above items

The following example shows how to use various type of data:

  1. package example
  2. import com.influxdb.annotations.Column
  3. import com.influxdb.annotations.Measurement
  4. import com.influxdb.client.domain.WritePrecision
  5. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
  6. import com.influxdb.client.write.Point
  7. import kotlinx.coroutines.flow.collect
  8. import kotlinx.coroutines.flow.consumeAsFlow
  9. import kotlinx.coroutines.runBlocking
  10. import java.time.Instant
  11. fun main() = runBlocking {
  12. val org = "my-org"
  13. val bucket = "my-bucket"
  14. //
  15. // Initialize client
  16. //
  17. val client = InfluxDBClientKotlinFactory
  18. .create("http://localhost:8086", "my-token".toCharArray(), org, bucket)
  19. val writeApi = client.getWriteKotlinApi()
  20. //
  21. // Write by Data Point
  22. //
  23. val point = Point.measurement("temperature")
  24. .addTag("location", "west")
  25. .addField("value", 55.0)
  26. .time(Instant.now().toEpochMilli(), WritePrecision.MS)
  27. writeApi.writePoint(point)
  28. //
  29. // Write by LineProtocol
  30. //
  31. writeApi.writeRecord("temperature,location=north value=60.0", WritePrecision.NS)
  32. //
  33. // Write by DataClass
  34. //
  35. val temperature = Temperature("south", 62.0, Instant.now())
  36. writeApi.writeMeasurement(temperature, WritePrecision.NS)
  37. //
  38. // Query results
  39. //
  40. val fluxQuery =
  41. """from(bucket: "$bucket") |> range(start: 0) |> filter(fn: (r) => (r["_measurement"] == "temperature"))"""
  42. client
  43. .getQueryKotlinApi()
  44. .query(fluxQuery)
  45. .consumeAsFlow()
  46. .collect { println("Measurement: ${it.measurement}, value: ${it.value}") }
  47. client.close()
  48. }
  49. @Measurement(name = "temperature")
  50. data class Temperature(
  51. @Column(tag = true) val location: String,
  52. @Column val value: Double,
  53. @Column(timestamp = true) val time: Instant
  54. )

Advanced Usage

Client configuration file

A client can be configured via configuration file. The configuration file has to be named as influx2.properties and has to be in root of classpath.

The following options are supported:

Property namedefaultdescription
influx2.url-the url to connect to InfluxDB
influx2.org-default destination organization for writes and queries
influx2.bucket-default destination bucket for writes
influx2.token-the token to use for the authorization
influx2.logLevelNONErest client verbosity level
influx2.readTimeout10000 msread timeout
influx2.writeTimeout10000 mswrite timeout
influx2.connectTimeout10000 mssocket timeout

The influx2.readTimeout, influx2.writeTimeout and influx2.connectTimeout supports ms, s and m as unit. Default is milliseconds.

Configuration example
  1. influx2.url=http://localhost:8086
  2. influx2.org=my-org
  3. influx2.bucket=my-bucket
  4. influx2.token=my-token
  5. influx2.logLevel=BODY
  6. influx2.readTimeout=5s
  7. influx2.writeTimeout=10s
  8. influx2.connectTimeout=5s

and then:

  1. val influxDBClient = InfluxDBClientKotlinFactory.create();

Client connection string

A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.

  1. val influxDBClient = InfluxDBClientKotlinFactory
  2. .create("http://localhost:8086?readTimeout=5000&connectTimeout=5000&logLevel=BASIC", token)

The following options are supported:

Property namedefaultdescription
org-default destination organization for writes and queries
bucket-default destination bucket for writes
token-the token to use for the authorization
logLevelNONErest client verbosity level
readTimeout10000 msread timeout
writeTimeout10000 mswrite timeout
connectTimeout10000 mssocket timeout

The readTimeout, writeTimeout and connectTimeout supports ms, s and m as unit. Default is milliseconds.

Gzip support

InfluxDBClientKotlin does not enable gzip compress for http requests by default. If you want to enable gzip to reduce transfer data’s size, you can call:

  1. influxDBClient.enableGzip();

Log HTTP Request and Response

The Requests and Responses can be logged by changing the LogLevel. LogLevel values are NONE, BASIC, HEADER, BODY. Note that applying the BODY LogLevel will disable chunking while streaming and will load the whole response into memory.

  1. influxDBClient.setLogLevel(LogLevel.HEADERS)

Check the server status

Server availability can be checked using the influxDBClient.ping() endpoint.

Construct queries using the flux-dsl query builder

  1. package example
  2. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
  3. import com.influxdb.query.dsl.Flux
  4. import com.influxdb.query.dsl.functions.restriction.Restrictions
  5. import kotlinx.coroutines.channels.consumeEach
  6. import kotlinx.coroutines.channels.filter
  7. import kotlinx.coroutines.channels.take
  8. import kotlinx.coroutines.runBlocking
  9. import java.time.temporal.ChronoUnit
  10. fun main(args: Array<String>) = runBlocking {
  11. val influxDBClient = InfluxDBClientKotlinFactory
  12. .create("http://localhost:8086", "my-token".toCharArray(), "my-org")
  13. val mem = Flux.from("my-bucket")
  14. .range(-30L, ChronoUnit.MINUTES)
  15. .filter(Restrictions.and(Restrictions.measurement().equal("mem"), Restrictions.field().equal("used_percent")))
  16. //Result is returned as a stream
  17. val results = influxDBClient.getQueryKotlinApi().query(mem.toString())
  18. //Example of additional result stream processing on client side
  19. results
  20. //filter on client side using `filter` built-in operator
  21. .filter { (it.value as Double) > 55 }
  22. // take first 20 records
  23. .take(20)
  24. //print results
  25. .consumeEach { println("Measurement: ${it.measurement}, value: ${it.value}") }
  26. influxDBClient.close()
  27. }

Version

The latest version for Maven dependency:

  1. <dependency>
  2. <groupId>com.influxdb</groupId>
  3. <artifactId>influxdb-client-kotlin</artifactId>
  4. <version>5.0.0</version>
  5. </dependency>

Or when using with Gradle:

  1. dependencies {
  2. implementation "com.influxdb:influxdb-client-kotlin:5.0.0"
  3. }

Snapshot Repository

The snapshots are deployed into OSS Snapshot repository.

Maven

  1. <repository>
  2. <id>ossrh</id>
  3. <name>OSS Snapshot repository</name>
  4. <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
  5. <releases>
  6. <enabled>false</enabled>
  7. </releases>
  8. <snapshots>
  9. <enabled>true</enabled>
  10. </snapshots>
  11. </repository>

Gradle

  1. repositories {
  2. maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
  3. }