• influxdb-client-kotlin
    • Documentation
    • Features
    • Queries
    • Writes
    • Advanced Usage
      • Client configuration file
        • Configuration example
    • Client connection string
    • Gzip support
    • Log HTTP Request and Response
    • Check the server status
    • Construct queries using the flux-dsl query builder
  • Version
    • Snapshot Repository
      • Maven
      • Gradle

    Kotlin - 图1influxdb-client-kotlin

    KDoc

    The reference Kotlin client that allows query and write for the InfluxDB 2.x by Kotlin Channel coroutines.

    Kotlin - 图3Documentation

    This section contains links to the client library documentation.

    Kotlin - 图4Features

    Kotlin - 图5Queries

    The QueryKotlinApi supports asynchronous queries by Kotlin Channel coroutines.

    The following example demonstrates querying using the Flux language:

    1. package example
    2. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
    3. import kotlinx.coroutines.channels.consumeEach
    4. import kotlinx.coroutines.channels.filter
    5. import kotlinx.coroutines.channels.take
    6. import kotlinx.coroutines.runBlocking
    7. fun main(args: Array<String>) = runBlocking {
    8. val influxDBClient = InfluxDBClientKotlinFactory
    9. .create("http://localhost:8086", "my-token".toCharArray(), "my-org")
    10. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
    11. + " |> range(start: -1d)"
    12. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))")
    13. //Result is returned as a stream
    14. val results = influxDBClient.getQueryKotlinApi().query(fluxQuery)
    15. //Example of additional result stream processing on client side
    16. results
    17. //filter on client side using `filter` built-in operator
    18. .filter { "cpu0" == it.getValueByKey("cpu") }
    19. //take first 20 records
    20. .take(20)
    21. //print results
    22. .consumeEach { println("Measurement: ${it.measurement}, value: ${it.value}") }
    23. influxDBClient.close()
    24. }

    It is possible to parse a result line-by-line using the queryRaw method:

    1. package example
    2. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
    3. import kotlinx.coroutines.channels.consumeEach
    4. import kotlinx.coroutines.runBlocking
    5. fun main(args: Array<String>) = runBlocking {
    6. val influxDBClient = InfluxDBClientKotlinFactory
    7. .create("http://localhost:8086", "my-token".toCharArray(), "my-org")
    8. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
    9. + " |> range(start: -5m)"
    10. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))"
    11. + " |> sample(n: 5, pos: 1)")
    12. //Result is returned as a stream
    13. val results = influxDBClient.getQueryKotlinApi().queryRaw(fluxQuery)
    14. //print results
    15. results.consumeEach { println("Line: $it") }
    16. influxDBClient.close()
    17. }

    Kotlin - 图6Writes

    The WriteKotlinApi supports ingest data by:

    • DataPoint
    • LineProtocol
    • Data class
    • List of above items

    The following example shows how to use various type of data:

    1. package example
    2. import com.influxdb.annotations.Column
    3. import com.influxdb.annotations.Measurement
    4. import com.influxdb.client.domain.WritePrecision
    5. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
    6. import com.influxdb.client.write.Point
    7. import kotlinx.coroutines.flow.collect
    8. import kotlinx.coroutines.flow.consumeAsFlow
    9. import kotlinx.coroutines.runBlocking
    10. import java.time.Instant
    11. fun main() = runBlocking {
    12. val org = "my-org"
    13. val bucket = "my-bucket"
    14. //
    15. // Initialize client
    16. //
    17. val client = InfluxDBClientKotlinFactory
    18. .create("http://localhost:8086", "my-token".toCharArray(), org, bucket)
    19. val writeApi = client.getWriteKotlinApi()
    20. //
    21. // Write by Data Point
    22. //
    23. val point = Point.measurement("temperature")
    24. .addTag("location", "west")
    25. .addField("value", 55.0)
    26. .time(Instant.now().toEpochMilli(), WritePrecision.MS)
    27. writeApi.writePoint(point)
    28. //
    29. // Write by LineProtocol
    30. //
    31. writeApi.writeRecord("temperature,location=north value=60.0", WritePrecision.NS)
    32. //
    33. // Write by DataClass
    34. //
    35. val temperature = Temperature("south", 62.0, Instant.now())
    36. writeApi.writeMeasurement(temperature, WritePrecision.NS)
    37. //
    38. // Query results
    39. //
    40. val fluxQuery =
    41. """from(bucket: "$bucket") |> range(start: 0) |> filter(fn: (r) => (r["_measurement"] == "temperature"))"""
    42. client
    43. .getQueryKotlinApi()
    44. .query(fluxQuery)
    45. .consumeAsFlow()
    46. .collect { println("Measurement: ${it.measurement}, value: ${it.value}") }
    47. client.close()
    48. }
    49. @Measurement(name = "temperature")
    50. data class Temperature(
    51. @Column(tag = true) val location: String,
    52. @Column val value: Double,
    53. @Column(timestamp = true) val time: Instant
    54. )

    Kotlin - 图7Advanced Usage

    Kotlin - 图8Client configuration file

    A client can be configured via configuration file. The configuration file has to be named as influx2.properties and has to be in root of classpath.

    The following options are supported:

    Property namedefaultdescription
    influx2.url-the url to connect to InfluxDB
    influx2.org-default destination organization for writes and queries
    influx2.bucket-default destination bucket for writes
    influx2.token-the token to use for the authorization
    influx2.logLevelNONErest client verbosity level
    influx2.readTimeout10000 msread timeout
    influx2.writeTimeout10000 mswrite timeout
    influx2.connectTimeout10000 mssocket timeout
    influx2.precisionNSdefault precision for unix timestamps in the line protocol
    influx2.clientType-to customize the User-Agent HTTP header

    The influx2.readTimeout, influx2.writeTimeout and influx2.connectTimeout supports ms, s and m as unit. Default is milliseconds.

    Kotlin - 图9Configuration example
    1. influx2.url=http://localhost:8086
    2. influx2.org=my-org
    3. influx2.bucket=my-bucket
    4. influx2.token=my-token
    5. influx2.logLevel=BODY
    6. influx2.readTimeout=5s
    7. influx2.writeTimeout=10s
    8. influx2.connectTimeout=5s

    and then:

    1. val influxDBClient = InfluxDBClientKotlinFactory.create();

    Kotlin - 图10Client connection string

    A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.

    1. val influxDBClient = InfluxDBClientKotlinFactory
    2. .create("http://localhost:8086?readTimeout=5000&connectTimeout=5000&logLevel=BASIC", token)

    The following options are supported:

    Property namedefaultdescription
    org-default destination organization for writes and queries
    bucket-default destination bucket for writes
    token-the token to use for the authorization
    logLevelNONErest client verbosity level
    readTimeout10000 msread timeout
    writeTimeout10000 mswrite timeout
    connectTimeout10000 mssocket timeout
    precisionNSdefault precision for unix timestamps in the line protocol
    clientType-to customize the User-Agent HTTP header

    The readTimeout, writeTimeout and connectTimeout supports ms, s and m as unit. Default is milliseconds.

    Kotlin - 图11Gzip support

    InfluxDBClientKotlin does not enable gzip compress for http requests by default. If you want to enable gzip to reduce transfer data’s size, you can call:

    1. influxDBClient.enableGzip();

    Kotlin - 图12Log HTTP Request and Response

    The Requests and Responses can be logged by changing the LogLevel. LogLevel values are NONE, BASIC, HEADER, BODY. Note that applying the BODY LogLevel will disable chunking while streaming and will load the whole response into memory.

    1. influxDBClient.setLogLevel(LogLevel.HEADERS)

    Kotlin - 图13Check the server status

    Server availability can be checked using the influxDBClient.ping() endpoint.

    Kotlin - 图14Construct queries using the flux-dsl query builder

    1. package example
    2. import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
    3. import com.influxdb.query.dsl.Flux
    4. import com.influxdb.query.dsl.functions.restriction.Restrictions
    5. import kotlinx.coroutines.channels.consumeEach
    6. import kotlinx.coroutines.channels.filter
    7. import kotlinx.coroutines.channels.take
    8. import kotlinx.coroutines.runBlocking
    9. import java.time.temporal.ChronoUnit
    10. fun main(args: Array<String>) = runBlocking {
    11. val influxDBClient = InfluxDBClientKotlinFactory
    12. .create("http://localhost:8086", "my-token".toCharArray(), "my-org")
    13. val mem = Flux.from("my-bucket")
    14. .range(-30L, ChronoUnit.MINUTES)
    15. .filter(Restrictions.and(Restrictions.measurement().equal("mem"), Restrictions.field().equal("used_percent")))
    16. //Result is returned as a stream
    17. val results = influxDBClient.getQueryKotlinApi().query(mem.toString())
    18. //Example of additional result stream processing on client side
    19. results
    20. //filter on client side using `filter` built-in operator
    21. .filter { (it.value as Double) > 55 }
    22. // take first 20 records
    23. .take(20)
    24. //print results
    25. .consumeEach { println("Measurement: ${it.measurement}, value: ${it.value}") }
    26. influxDBClient.close()
    27. }

    Kotlin - 图15Version

    The latest version for Maven dependency:

    1. <dependency>
    2. <groupId>com.influxdb</groupId>
    3. <artifactId>influxdb-client-kotlin</artifactId>
    4. <version>6.9.0</version>
    5. </dependency>

    Or when using with Gradle:

    1. dependencies {
    2. implementation "com.influxdb:influxdb-client-kotlin:6.9.0"
    3. }

    Kotlin - 图16Snapshot Repository

    The snapshots are deployed into OSS Snapshot repository.

    Kotlin - 图17Maven

    1. <repository>
    2. <id>ossrh</id>
    3. <name>OSS Snapshot repository</name>
    4. <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
    5. <releases>
    6. <enabled>false</enabled>
    7. </releases>
    8. <snapshots>
    9. <enabled>true</enabled>
    10. </snapshots>
    11. </repository>

    Kotlin - 图18Gradle

    1. repositories {
    2. maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
    3. }