• influxdb-client-scala
    • Features
    • Queries
    • Advanced Usage
      • Client configuration file
        • Configuration example
    • Client connection string
    • Gzip support
    • Log HTTP Request and Response
    • Check the server status
    • Construct queries using the flux-dsl query builder
  • Version
    • Snapshot Repository
      • Maven
      • Gradle

    Scala - 图1influxdb-client-scala

    KDoc

    The reference Scala client that allows query and write for the InfluxDB 2.0 by Akka Streams.

    Scala - 图3Features

    Scala - 图4Queries

    The QueryScalaApi is based on the Akka Streams. The streaming can be configured by:

    • bufferSize - Size of a buffer for incoming responses. Default 10000.
    • overflowStrategy - Strategy that is used when incoming response cannot fit inside the buffer. Default akka.stream.OverflowStrategies.Backpressure.
    1. val fluxClient = InfluxDBClientScalaFactory.create(options, 5000, OverflowStrategy.dropTail)

    The following example demonstrates querying using the Flux language:

    1. package example
    2. import akka.actor.ActorSystem
    3. import akka.stream.scaladsl.Sink
    4. import com.influxdb.client.scala.InfluxDBClientScalaFactory
    5. import com.influxdb.query.FluxRecord
    6. import scala.concurrent.Await
    7. import scala.concurrent.duration.Duration
    8. object InfluxDB2ScalaExample {
    9. implicit val system: ActorSystem = ActorSystem("it-tests")
    10. def main(args: Array[String]): Unit = {
    11. val influxDBClient = InfluxDBClientScalaFactory
    12. .create("http://localhost:8086", "my-token".toCharArray, "my-org")
    13. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
    14. + " |> range(start: -1d)"
    15. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))")
    16. //Result is returned as a stream
    17. val results = influxDBClient.getQueryScalaApi().query(fluxQuery)
    18. //Example of additional result stream processing on client side
    19. val sink = results
    20. //filter on client side using `filter` built-in operator
    21. .filter(it => "cpu0" == it.getValueByKey("cpu"))
    22. //take first 20 records
    23. .take(20)
    24. //print results
    25. .runWith(Sink.foreach[FluxRecord](it => println(s"Measurement: ${it.getMeasurement}, value: ${it.getValue}")
    26. ))
    27. // wait to finish
    28. Await.result(sink, Duration.Inf)
    29. influxDBClient.close()
    30. system.terminate()
    31. }
    32. }

    It is possible to parse a result line-by-line using the queryRaw method:

    1. package example
    2. import akka.actor.ActorSystem
    3. import akka.stream.scaladsl.Sink
    4. import com.influxdb.client.scala.InfluxDBClientScalaFactory
    5. import scala.concurrent.Await
    6. import scala.concurrent.duration.Duration
    7. object InfluxDB2ScalaExampleRaw {
    8. implicit val system: ActorSystem = ActorSystem("it-tests")
    9. def main(args: Array[String]): Unit = {
    10. val influxDBClient = InfluxDBClientScalaFactory
    11. .create("http://localhost:8086", "my-token".toCharArray, "my-org")
    12. val fluxQuery = ("from(bucket: \"my-bucket\")\n"
    13. + " |> range(start: -5m)"
    14. + " |> filter(fn: (r) => (r[\"_measurement\"] == \"cpu\" and r[\"_field\"] == \"usage_system\"))"
    15. + " |> sample(n: 5, pos: 1)")
    16. //Result is returned as a stream
    17. val sink = influxDBClient.getQueryScalaApi().queryRaw(fluxQuery)
    18. //print results
    19. .runWith(Sink.foreach[String](it => println(s"Line: $it")))
    20. // wait to finish
    21. Await.result(sink, Duration.Inf)
    22. influxDBClient.close()
    23. system.terminate()
    24. }
    25. }

    Scala - 图5Advanced Usage

    Scala - 图6Client configuration file

    A client can be configured via configuration file. The configuration file has to be named as influx2.properties and has to be in root of classpath.

    The following options are supported:

    Property namedefaultdescription
    influx2.url-the url to connect to InfluxDB
    influx2.org-default destination organization for writes and queries
    influx2.bucket-default destination bucket for writes
    influx2.token-the token to use for the authorization
    influx2.logLevelNONErest client verbosity level
    influx2.readTimeout10000 msread timeout
    influx2.writeTimeout10000 mswrite timeout
    influx2.connectTimeout10000 mssocket timeout

    The influx2.readTimeout, influx2.writeTimeout and influx2.connectTimeout supports ms, s and m as unit. Default is milliseconds.

    Scala - 图7Configuration example
    1. influx2.url=http://localhost:8086
    2. influx2.org=my-org
    3. influx2.bucket=my-bucket
    4. influx2.token=my-token
    5. influx2.logLevel=BODY
    6. influx2.readTimeout=5s
    7. influx2.writeTimeout=10s
    8. influx2.connectTimeout=5s

    and then:

    1. val influxDBClient = InfluxDBClientScalaFactory.create();

    Scala - 图8Client connection string

    A client can be constructed using a connection string that can contain the InfluxDBClientOptions parameters encoded into the URL.

    1. val influxDBClient = InfluxDBClientScalaFactory
    2. .create("http://localhost:8086?readTimeout=5000&connectTimeout=5000&logLevel=BASIC", token)

    The following options are supported:

    Property namedefaultdescription
    org-default destination organization for writes and queries
    bucket-default destination bucket for writes
    token-the token to use for the authorization
    logLevelNONErest client verbosity level
    readTimeout10000 msread timeout
    writeTimeout10000 mswrite timeout
    connectTimeout10000 mssocket timeout

    The readTimeout, writeTimeout and connectTimeout supports ms, s and m as unit. Default is milliseconds.

    Scala - 图9Gzip support

    InfluxDBClientScala does not enable gzip compress for http requests by default. If you want to enable gzip to reduce transfer data’s size, you can call:

    1. influxDBClient.enableGzip();

    Scala - 图10Log HTTP Request and Response

    The Requests and Responses can be logged by changing the LogLevel. LogLevel values are NONE, BASIC, HEADER, BODY. Note that applying the BODY LogLevel will disable chunking while streaming and will load the whole response into memory.

    1. influxDBClient.setLogLevel(LogLevel.HEADERS)

    Scala - 图11Check the server status

    Server availability can be checked using the influxDBClient.health() endpoint.

    Scala - 图12Construct queries using the flux-dsl query builder

    1. package example
    2. import java.time.temporal.ChronoUnit
    3. import akka.actor.ActorSystem
    4. import akka.stream.scaladsl.Sink
    5. import com.influxdb.client.scala.InfluxDBClientScalaFactory
    6. import com.influxdb.query.FluxRecord
    7. import com.influxdb.query.dsl.Flux
    8. import com.influxdb.query.dsl.functions.restriction.Restrictions
    9. import scala.concurrent.Await
    10. import scala.concurrent.duration.Duration
    11. object InfluxDB2ScalaExampleDSL {
    12. implicit val system: ActorSystem = ActorSystem("it-tests")
    13. def main(args: Array[String]) {
    14. val influxDBClient = InfluxDBClientScalaFactory
    15. .create("http://localhost:8086", "my-token".toCharArray, "my-org")
    16. val mem = Flux.from("my-bucket")
    17. .range(-30L, ChronoUnit.MINUTES)
    18. .filter(Restrictions.and(Restrictions.measurement().equal("mem"), Restrictions.field().equal("used_percent")))
    19. //Result is returned as a stream
    20. val results = influxDBClient.getQueryScalaApi().query(mem.toString())
    21. //Example of additional result stream processing on client side
    22. val sink = results
    23. //filter on client side using `filter` built-in operator
    24. .filter(it => it.getValue.asInstanceOf[Double] > 55)
    25. //take first 20 records
    26. .take(20)
    27. //print results
    28. .runWith(Sink.foreach[FluxRecord](it => println(s"Measurement: ${it.getMeasurement}, value: ${it.getValue}")))
    29. // wait to finish
    30. Await.result(sink, Duration.Inf)
    31. influxDBClient.close()
    32. system.terminate()
    33. }
    34. }

    Scala - 图13Version

    The latest version for Maven dependency:

    1. <dependency>
    2. <groupId>com.influxdb</groupId>
    3. <artifactId>influxdb-client-scala</artifactId>
    4. <version>1.13.0</version>
    5. </dependency>

    Or when using with Gradle:

    1. dependencies {
    2. compile "com.influxdb:influxdb-client-scala:1.13.0"
    3. }

    Scala - 图14Snapshot Repository

    The snapshots are deployed into OSS Snapshot repository.

    Scala - 图15Maven

    1. <repository>
    2. <id>ossrh</id>
    3. <name>OSS Snapshot repository</name>
    4. <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
    5. <releases>
    6. <enabled>false</enabled>
    7. </releases>
    8. <snapshots>
    9. <enabled>true</enabled>
    10. </snapshots>
    11. </repository>

    Scala - 图16Gradle

    1. repositories {
    2. maven { url "https://oss.sonatype.org/content/repositories/snapshots" }
    3. }