Spark-IoTDB

version

The versions required for Spark and Java are as follow:

Spark VersionScala VersionJava VersionTsFile
2.4.52.121.80.12.0

Currently we only support spark version 2.4.3 and there are some known issue on 2.4.7, do no use it

Install

mvn clean scala:compile compile install

Maven Dependency

  1. <dependency>
  2. <groupId>org.apache.iotdb</groupId>
  3. <artifactId>spark-iotdb-connector</artifactId>
  4. <version>0.12.5</version>
  5. </dependency>

spark-shell user guide

  1. spark-shell --jars spark-iotdb-connector-0.12.0.jar,iotdb-jdbc-0.12.0-jar-with-dependencies.jar
  2. import org.apache.iotdb.spark.db._
  3. val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").load
  4. df.printSchema()
  5. df.show()

To partition rdd:

  1. spark-shell --jars spark-iotdb-connector-0.12.0.jar,iotdb-jdbc-0.12.0-jar-with-dependencies.jar
  2. import org.apache.iotdb.spark.db._
  3. val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").
  4. option("lowerBound", [lower bound of time that you want query(include)]).option("upperBound", [upper bound of time that you want query(include)]).
  5. option("numPartition", [the partition number you want]).load
  6. df.printSchema()
  7. df.show()

Schema Inference

Take the following TsFile structure as an example: There are three Measurements in the TsFile schema: status, temperature, and hardware. The basic information of these three measurements is as follows:

NameTypeEncode
statusBooleanPLAIN
temperatureFloatRLE
hardwareTextPLAIN

The existing data in the TsFile is as follows:

SI

The wide(default) table form is as follows:

timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

You can also use narrow table form which as follows: (You can see part 4 about how to use narrow form)

timedevice_namestatushardwaretemperature
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull

Transform between wide and narrow table

  • from wide to narrow
  1. import org.apache.iotdb.spark.db._
  2. val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root where time < 1100 and time > 1000").load
  3. val narrow_df = Transformer.toNarrowForm(spark, wide_df)
  • from narrow to wide
  1. import org.apache.iotdb.spark.db._
  2. val wide_df = Transformer.toWideForm(spark, narrow_df)

Java user guide

  1. import org.apache.spark.sql.Dataset;
  2. import org.apache.spark.sql.Row;
  3. import org.apache.spark.sql.SparkSession;
  4. import org.apache.iotdb.spark.db.*
  5. public class Example {
  6. public static void main(String[] args) {
  7. SparkSession spark = SparkSession
  8. .builder()
  9. .appName("Build a DataFrame from Scratch")
  10. .master("local[*]")
  11. .getOrCreate();
  12. Dataset<Row> df = spark.read().format("org.apache.iotdb.spark.db")
  13. .option("url","jdbc:iotdb://127.0.0.1:6667/")
  14. .option("sql","select * from root").load();
  15. df.printSchema();
  16. df.show();
  17. Dataset<Row> narrowTable = Transformer.toNarrowForm(spark, df)
  18. narrowTable.show()
  19. }
  20. }

Write Data to IoTDB

User Guide

  1. // import narrow table
  2. val df = spark.createDataFrame(List(
  3. (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
  4. (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
  5. val dfWithColumn = df.withColumnRenamed("_1", "Time")
  6. .withColumnRenamed("_2", "device_name")
  7. .withColumnRenamed("_3", "s0")
  8. .withColumnRenamed("_4", "s1")
  9. .withColumnRenamed("_5", "s2")
  10. .withColumnRenamed("_6", "s3")
  11. .withColumnRenamed("_7", "s4")
  12. .withColumnRenamed("_8", "s5")
  13. dfWithColumn
  14. .write
  15. .format("org.apache.iotdb.spark.db")
  16. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  17. .save
  18. // import wide table
  19. val df = spark.createDataFrame(List(
  20. (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
  21. (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
  22. val dfWithColumn = df.withColumnRenamed("_1", "Time")
  23. .withColumnRenamed("_2", "root.test.d0.s0")
  24. .withColumnRenamed("_3", "root.test.d0.s1")
  25. .withColumnRenamed("_4", "root.test.d0.s2")
  26. .withColumnRenamed("_5", "root.test.d0.s3")
  27. .withColumnRenamed("_6", "root.test.d0.s4")
  28. .withColumnRenamed("_7", "root.test.d0.s5")
  29. dfWithColumn.write.format("org.apache.iotdb.spark.db")
  30. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  31. .option("numPartition", "10")
  32. .save

Notes

  1. You can directly write data to IoTDB whatever the dataframe contains a wide table or a narrow table.
  2. The parameter numPartition is used to set the number of partitions. The dataframe that you want to save will be repartition based on this parameter before writing data. Each partition will open a session to write data to increase the number of concurrent requests.