TsFile-Spark-Connector用户指南

1. 关于TsFile-Spark-Connector

TsFile-Spark-Connector对Tsfile类型的外部数据源实现Spark的支持。 这使用户可以通过Spark读取,写入和查询Tsfile。

使用此连接器,您可以

  • 从本地文件系统或hdfs将单个TsFile加载到Spark
  • 从本地文件系统或hdfs将特定目录中的所有文件加载到Spark
  • 将数据从Spark写入TsFile

2. 系统要求

Spark版本Scala 版本Java 版本TsFile
2.4.32.11.81.80.9.3

注意:有关如何下载和使用TsFile的更多信息,请参见以下链接: https://github.com/apache/incubator-iotdb/tree/master/tsfile.

3. 快速开始

本地模式

在本地模式下使用TsFile-Spark-Connector启动Spark:

  1. ./<spark-shell-path> --jars tsfile-spark-connector.jar,tsfile-0.9.3-jar-with-dependencies.jar

注意:

分布式模式

在分布式模式下使用TsFile-Spark-Connector启动Spark(即,Spark集群通过spark-shell连接):

  1. . /<spark-shell-path> --jars tsfile-spark-connector.jar,tsfile-0.9.3-jar-with-dependencies.jar --master spark://ip:7077

注意:

4. 数据类型对应

TsFile数据类型SparkSQL数据类型
BOOLEANBooleanType
INT32IntegerType
INT64LongType
FLOATFloatType
DOUBLEDoubleType
TEXTStringType

5. 模式推断

显示TsFile的方式取决于架构。 以以下TsFile结构为例:TsFile模式中有三个度量:状态,温度和硬件。 这三种测量的基本信息如下:

名称类型编码
statusBooleanPLAIN
temperatureFloatRLE
hardwareTextPLAIN

TsFile中的现有数据如下:

device:root.ln.wf01.wt01device:root.ln.wf02.wt02
statustemperaturehardwarestatus
timevaluetimevaluetimevaluetimevalue
1True12.22“aaa”1True
3True22.24“bbb”2False
5False32.16“ccc”4True

相应的SparkSQL表如下:

timeroot.ln.wf02.wt02.temperatureroot.ln.wf02.wt02.statusroot.ln.wf02.wt02.hardwareroot.ln.wf01.wt01.temperatureroot.ln.wf01.wt01.statusroot.ln.wf01.wt01.hardware
1nulltruenull2.2truenull
2nullfalseaaa2.2nullnull
3nullnullnull2.1truenull
4nulltruebbbnullnullnull
5nullnullnullnullfalsenull
6nullnullcccnullnullnull

您还可以使用如下所示的窄表形式:(您可以参阅第6部分,了解如何使用窄表形式)

timedevice_namestatushardwaretemperature
1root.ln.wf02.wt01truenull2.2
1root.ln.wf02.wt02truenullnull
2root.ln.wf02.wt01nullnull2.2
2root.ln.wf02.wt02falseaaanull
3root.ln.wf02.wt01truenull2.1
4root.ln.wf02.wt02truebbbnull
5root.ln.wf02.wt01falsenullnull
6root.ln.wf02.wt02nullcccnull

6. Scala API

注意:请记住预先分配必要的读写权限。

示例1:从本地文件系统读取

  1. import org.apache.iotdb.tsfile._
  2. val wide_df = spark.read.tsfile("test.tsfile")
  3. wide_df.show
  4. val narrow_df = spark.read.tsfile("test.tsfile", true)
  5. narrow_df.show

示例2:从hadoop文件系统读取

  1. import org.apache.iotdb.tsfile._
  2. val wide_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. wide_df.show
  4. val narrow_df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  5. narrow_df.show

示例3:从特定目录读取

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/usr/hadoop")
  3. df.show

注1:现在不支持目录中所有TsFile的全局时间排序。

注2:具有相同名称的度量应具有相同的架构。

示例4:广泛形式的查询

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select * from tsfile_table where `device_1.sensor_1`>0 and `device_1.sensor_2` < 22")
  5. newDf.show
  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select count(*) from tsfile_table")
  5. newDf.show

示例5:缩小形式的查询

  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select * from tsfile_table where device_name = 'root.ln.wf02.wt02' and temperature > 5")
  5. newDf.show
  1. import org.apache.iotdb.tsfile._
  2. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  3. df.createOrReplaceTempView("tsfile_table")
  4. val newDf = spark.sql("select count(*) from tsfile_table")
  5. newDf.show

例子6:以宽写形式

  1. // we only support wide_form table to write
  2. import org.apache.iotdb.tsfile._
  3. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile")
  4. df.show
  5. df.write.tsfile("hdfs://localhost:9000/output")
  6. val newDf = spark.read.tsfile("hdfs://localhost:9000/output")
  7. newDf.show

示例6:以窄写形式

  1. // we only support wide_form table to write
  2. import org.apache.iotdb.tsfile._
  3. val df = spark.read.tsfile("hdfs://localhost:9000/test.tsfile", true)
  4. df.show
  5. df.write.tsfile("hdfs://localhost:9000/output", true)
  6. val newDf = spark.read.tsfile("hdfs://localhost:9000/output", true)
  7. newDf.show

附录A:模式推断的旧设计

显示TsFile的方式与TsFile Schema有关。 以以下TsFile结构为例:TsFile架构中有三个度量:状态,温度和硬件。 这三个度量的基本信息如下:

名称类型编码
statusBooleanPLAIN
temperatureFloatRLE
hardwareTextPLAIN

测量的基本信息

文件中的现有数据如下:

delta_object:root.ln.wf01.wt01delta_object:root.ln.wf02.wt02delta_object:root.sgcc.wf03.wt01
statustemperaturehardwarestatusstatustemperature
timevaluetimevaluetimevaluetimevaluetimevaluetimevalue
1True12.22“aaa”1True2True33.3
3True22.24“bbb”2False3True66.6
5False32.16“ccc”4True4True88.8
7True42.08“ddd”5False6True99.9

A set of time-series data

有两种显示方法:

默认方式

将创建两列来存储设备的完整路径:time(LongType)和delta_object(StringType)。

  • time : Timestamp, LongType
  • delta_object : Delta_object ID, StringType

接下来,为每个度量创建一列以存储特定数据。 SparkSQL表结构如下:

time(LongType)delta_object(StringType)status(BooleanType)temperature(FloatType)hardware(StringType)
1root.ln.wf01.wt01True2.2null
1root.ln.wf02.wt02Truenullnull
2root.ln.wf01.wt01null2.2null
2root.ln.wf02.wt02Falsenull“aaa”
2root.sgcc.wf03.wt01Truenullnull
3root.ln.wf01.wt01True2.1null
3root.sgcc.wf03.wt01True3.3null
4root.ln.wf01.wt01null2.0null
4root.ln.wf02.wt02Truenull“bbb”
4root.sgcc.wf03.wt01Truenullnull
5root.ln.wf01.wt01Falsenullnull
5root.ln.wf02.wt02Falsenullnull
5root.sgcc.wf03.wt01Truenullnull
6root.ln.wf02.wt02nullnull“ccc”
6root.sgcc.wf03.wt01null6.6null
7root.ln.wf01.wt01Truenullnull
8root.ln.wf02.wt02nullnull“ddd”
8root.sgcc.wf03.wt01null8.8null
9root.sgcc.wf03.wt01null9.9null
展开delta_object列

用“。”展开设备列。 分成多个列,忽略根目录“ root”。 方便进行更丰富的聚合操作。 如果用户希望使用这种显示方式,则需要在表创建语句中设置参数“ delta \ _object \ _name”(请参阅本手册第5.1节中的示例5),如本示例中的那样,参数“ delta \ _object \ _name“设置为” root.device.turbine“。 路径层的数量必须是一对一的。 此时,将为设备路径的每一层(“根”层除外)创建一列。 列名是参数中的名称,值是设备相应层的名称。 接下来,将为每个度量创建一列以存储特定数据。

然后,SparkSQL表结构如下:

time(LongType)group(StringType)field(StringType)device(StringType)status(BooleanType)temperature(FloatType)hardware(StringType)
1lnwf01wt01True2.2null
1lnwf02wt02Truenullnull
2lnwf01wt01null2.2null
2lnwf02wt02Falsenull“aaa”
2sgccwf03wt01Truenullnull
3lnwf01wt01True2.1null
3sgccwf03wt01True3.3null
4lnwf01wt01null2.0null
4lnwf02wt02Truenull“bbb”
4sgccwf03wt01Truenullnull
5lnwf01wt01Falsenullnull
5lnwf02wt02Falsenullnull
5sgccwf03wt01Truenullnull
6lnwf02wt02nullnull“ccc”
6sgccwf03wt01null6.6null
7lnwf01wt01Truenullnull
8lnwf02wt02nullnull“ddd”
8sgccwf03wt01null8.8null
9sgccwf03wt01null9.9null

TsFile-Spark-Connector可以在SparkSQL By SparkSQL中将一个或多个TsFiles显示为表。 它还允许用户指定一个目录或使用通配符来匹配多个目录。 如果有多个TsFile,则所有TsFile中的度量的并集将保留在表中,并且具有相同名称的度量默认情况下将具有相同的数据类型。 请注意,如果存在名称相同但数据类型不同的情况,则TsFile-Spark-Connector将无法保证结果的正确性。

写入过程是将一个DataFrame写入一个或多个TsFiles。 默认情况下,需要包括两列:time和delta_object。 其余的列用作“度量”。 如果用户想将第二个表结构写回到TsFile,则可以设置“ delta \ _object \ _name”参数(请参阅本手册5.1节的5.1节)。

附录B:旧注

注意:检查Spark根目录中的jar软件包,并分别用libthrift-0.9.1.jar和libfb303-0.9.1.jar替换libthrift-0.9.3.jar和libfb303-0.9.3.jar。