Spark-IoTDB

版本

Spark和Java所需的版本如下:

Spark VersionScala VersionJava VersionTsFile
2.4.0-3.2.02.121.80.13.0

注意

  1. Spark IoTDB Connector只支持Spark 2.4.0Spark 3.2.0Scala 2.12版本。 如果需要对其他版本进行支持,可以通过修改源码中spark-iotdb-connector这个模块里面pom文件的Scala版本之后进行重新编译。

  2. 因为IoTDB与Spark的thrift版本有冲突,所以需要通过执行rm -f $SPARK_HOME/jars/libthrift*cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/这两个命令来解决。 否则的话,就只能在IDE里面进行代码调试。而且如果你需要通过spark-submit命令提交任务的话,你打包时必须要带上依赖。

安装

mvn clean scala:compile compile install

Maven依赖

  1. <dependency>
  2. <groupId>org.apache.iotdb</groupId>
  3. <artifactId>spark-iotdb-connector</artifactId>
  4. <version>0.13.0</version>
  5. </dependency>

Spark-shell用户指南

  1. spark-shell --jars spark-iotdb-connector-0.13.0.jar,iotdb-jdbc-0.13.0-jar-with-dependencies.jar
  2. import org.apache.iotdb.spark.db._
  3. val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").load
  4. df.printSchema()
  5. df.show()

如果要对rdd进行分区,可以执行以下操作

  1. spark-shell --jars spark-iotdb-connector-0.13.0.jar,iotdb-jdbc-0.13.0-jar-with-dependencies.jar
  2. import org.apache.iotdb.spark.db._
  3. val df = spark.read.format("org.apache.iotdb.spark.db").option("url","jdbc:iotdb://127.0.0.1:6667/").option("sql","select * from root").
  4. option("lowerBound", [lower bound of time that you want query(include)]).option("upperBound", [upper bound of time that you want query(include)]).
  5. option("numPartition", [the partition number you want]).load
  6. df.printSchema()
  7. df.show()

模式推断

以下TsFile结构为例:TsFile模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:

名称类型编码
状态BooleanPLAIN
温度FloatRLE
硬件TextPLAIN

TsFile中的现有数据如下:

  • d1:root.ln.wf01.wt01
  • d2:root.ln.wf02.wt02
timed1.statustimed1.temperaturetimed2.hardwaretimed2.status
1True12.22“aaa”1True
3True22.24“bbb”2False
5False32.16“ccc”4True

宽(默认)表形式如下:

timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

你还可以使用窄表形式,如下所示:(您可以参阅第4部分,了解如何使用窄表形式)

时间设备名状态硬件温度
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull

在宽和窄表之间转换

  • 从宽到窄
  1. import org.apache.iotdb.spark.db._
  2. val wide_df = spark.read.format("org.apache.iotdb.spark.db").option("url", "jdbc:iotdb://127.0.0.1:6667/").option("sql", "select * from root where time < 1100 and time > 1000").load
  3. val narrow_df = Transformer.toNarrowForm(spark, wide_df)
  • 从窄到宽
  1. import org.apache.iotdb.spark.db._
  2. val wide_df = Transformer.toWideForm(spark, narrow_df)

Java用户指南

  1. import org.apache.spark.sql.Dataset;
  2. import org.apache.spark.sql.Row;
  3. import org.apache.spark.sql.SparkSession;
  4. import org.apache.iotdb.spark.db.*;
  5. public class Example {
  6. public static void main(String[] args) {
  7. SparkSession spark = SparkSession
  8. .builder()
  9. .appName("Build a DataFrame from Scratch")
  10. .master("local[*]")
  11. .getOrCreate();
  12. Dataset<Row> df = spark.read().format("org.apache.iotdb.spark.db")
  13. .option("url","jdbc:iotdb://127.0.0.1:6667/")
  14. .option("sql","select * from root").load();
  15. df.printSchema();
  16. df.show();
  17. Dataset<Row> narrowTable = Transformer.toNarrowForm(spark, df);
  18. narrowTable.show();
  19. }
  20. }

写数据到IoTDB

用户指南

  1. // import narrow table
  2. val df = spark.createDataFrame(List(
  3. (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
  4. (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
  5. val dfWithColumn = df.withColumnRenamed("_1", "Time")
  6. .withColumnRenamed("_2", "device_name")
  7. .withColumnRenamed("_3", "s0")
  8. .withColumnRenamed("_4", "s1")
  9. .withColumnRenamed("_5", "s2")
  10. .withColumnRenamed("_6", "s3")
  11. .withColumnRenamed("_7", "s4")
  12. .withColumnRenamed("_8", "s5")
  13. dfWithColumn
  14. .write
  15. .format("org.apache.iotdb.spark.db")
  16. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  17. .save
  18. // import wide table
  19. val df = spark.createDataFrame(List(
  20. (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
  21. (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
  22. val dfWithColumn = df.withColumnRenamed("_1", "Time")
  23. .withColumnRenamed("_2", "root.test.d0.s0")
  24. .withColumnRenamed("_3", "root.test.d0.s1")
  25. .withColumnRenamed("_4", "root.test.d0.s2")
  26. .withColumnRenamed("_5", "root.test.d0.s3")
  27. .withColumnRenamed("_6", "root.test.d0.s4")
  28. .withColumnRenamed("_7", "root.test.d0.s5")
  29. dfWithColumn.write.format("org.apache.iotdb.spark.db")
  30. .option("url", "jdbc:iotdb://127.0.0.1:6667/")
  31. .option("numPartition", "10")
  32. .save

注意

  1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
  2. numPartition参数是用来设置分区数,会在写入数据之前给dataframe进行重分区。每一个分区都会开启一个session进行数据的写入,来提高并发数。