TsFile API

TsFile 是在 IoTDB 中使用的时间序列的文件格式。在这个章节中,我们将介绍这种文件格式的用法。

安装 TsFile libaray

在您自己的项目中有两种方法使用 TsFile .

  • 使用 jar 包:

    • 编译源码生成 jar 包

      1. git clone https://github.com/apache/iotdb.git
      2. cd tsfile/
      3. mvn clean package -Dmaven.test.skip=true

      命令执行完成之后,所有的 jar 包都可以从 target/ 目录下找到。之后您可以在自己的工程中倒入 target/tsfile-0.11.1-jar-with-dependencies.jar.

  • 使用 Maven 依赖:

    编译源码并且部署到您的本地仓库中需要 3 步:

    • 下载源码

      1. git clone https://github.com/apache/iotdb.git
    • 编译源码和部署到本地仓库

      1. cd tsfile/
      2. mvn clean install -Dmaven.test.skip=true
    • 在您自己的工程中增加依赖:

      1. <dependency>
      2. <groupId>org.apache.iotdb</groupId>
      3. <artifactId>tsfile</artifactId>
      4. <version>0.11.1</version>
      5. </dependency>

    或者,您可以直接使用官方的 Maven 仓库:

    • 首先,在${username}\.m2\settings.xml目录下的settings.xml文件中<profiles> 节中增加<profile>,内容如下:

      1. <profile>
      2. <id>allow-snapshots</id>
      3. <activation><activeByDefault>true</activeByDefault></activation>
      4. <repositories>
      5. <repository>
      6. <id>apache.snapshots</id>
      7. <name>Apache Development Snapshot Repository</name>
      8. <url>https://repository.apache.org/content/repositories/snapshots/</url>
      9. <releases>
      10. <enabled>false</enabled>
      11. </releases>
      12. <snapshots>
      13. <enabled>true</enabled>
      14. </snapshots>
      15. </repository>
      16. </repositories>
      17. </profile>
    • 之后您可以在您的工程中增加如下依赖:

      1. <dependency>
      2. <groupId>org.apache.iotdb</groupId>
      3. <artifactId>tsfile</artifactId>
      4. <version>0.11.1</version>
      5. </dependency>

TSFile 的使用

本章节演示TsFile的详细用法。

时序数据(Time-series Data)

一个时序是由4个序列组成,分别是 device, measurement, time, value。

  • measurement: 时间序列描述的是一个物理或者形式的测量(measurement),比如:城市的温度,一些商品的销售数量或者是火车在不同时间的速度。 传统的传感器(如温度计)也采用单次测量(measurement)并产生时间序列,我们将在下面交替使用测量(measurement)和传感器。

  • device: 一个设备指的是一个正在进行多次测量(产生多个时间序列)的实体,例如, ​ ​ ​ 一列正在运行的火车监控它的速度、油表、它已经运行的英里数,当前的乘客每个都被传送到一个时间序列。

表1描述了一组时间序列数据。下表中显示的集合包含一个名为 “device_1” 的设备,它有三个测量值(measurement)分别是 “sensor_1”, “sensor_2” 和 “sensor_3”.

device_1
sensor_1sensor_2sensor_3
timevaluetimevaluetimevalue
11.2120250
31.4220451
51.1321652
71.8420853

一组时间序列数据

单行数据: 在许多工业应用程序中,一个设备通常包含多个传感器,这些传感器可能同时具有多个值,这称为一行数据。

在形式上,一行数据包含一个device_id,它是一个时间戳,表示从 1970年1月1日 00:00:00 开始的毫秒数, 以及由measurement_id和相应的value组成的几个数据对。一行中的所有数据对都属于这个device_id,并且具有相同的时间戳。 如果其中一个度量值measurements在某个时间戳timestamp没有值value,将使用一个空格表示(实际上 TsFile 并不存储 null 值)。 其格式如下:

  1. device_id, timestamp, <measurement_id, value>...

示例数据如下所示。在本例中,两个度量值(measurement)的数据类型分别是INT32FLOAT

  1. device_1, 1490860659000, m1, 10, m2, 12.12

写入 TsFile

生成一个 TsFile 文件

TsFile可以通过以下三个步骤生成,完整的代码参见”写入 TsFile 示例”章节。

  • 首先,构造一个TsFileWriter实例。

    以下是可用的构造函数:

    • 没有预定义 schema
    1. public TsFileWriter(File file) throws IOException
    • 预定义 schema
    1. public TsFileWriter(File file, Schema schema) throws IOException

    这个是用于使用 HDFS 文件系统的。TsFileOutput可以是HDFSOutput类的一个实例。

    1. public TsFileWriter(TsFileOutput output, Schema schema) throws IOException

    如果你想自己设置一些 TSFile 的配置,你可以使用config参数。比如:

    1. TSFileConfig conf = new TSFileConfig();
    2. conf.setTSFileStorageFs("HDFS");
    3. TsFileWriter tsFileWriter = new TsFileWriter(file, schema, conf);

    在上面的例子中,数据文件将存储在 HDFS 中,而不是本地文件系统中。如果你想在本地文件系统中存储数据文件,你可以使用conf.setTSFileStorageFs("LOCAL"),这也是默认的配置。

    您还可以通过config.setHdfsIp(...)config.setHdfsPort(...)来配置 HDFS 的 IP 和端口。默认的 IP是localhost,默认的RPC端口是9000.

    参数:

    • file : 写入 TsFile 数据的文件

    • schema : 文件的 schemas,将在下章进行介绍

    • config : TsFile 的一些配置项

  • 第二步,添加测量值(measurement)

    你也可以先创建一个Schema类的实例然后把它传递给TsFileWriter类的构造函数

    Schema类保存的是一个映射关系,key 是一个 measurement 的名字,value 是 measurement schema.

    下面是一系列接口:

    1. // Create an empty Schema or from an existing map
    2. public Schema()
    3. public Schema(Map<String, MeasurementSchema> measurements)
    4. // Use this two interfaces to add measurements
    5. public void registerMeasurement(MeasurementSchema descriptor)
    6. public void registerMeasurements(Map<String, MeasurementSchema> measurements)
    7. // Some useful getter and checker
    8. public TSDataType getMeasurementDataType(String measurementId)
    9. public MeasurementSchema getMeasurementSchema(String measurementId)
    10. public Map<String, MeasurementSchema> getAllMeasurementSchema()
    11. public boolean hasMeasurement(String measurementId)

    你可以在TsFileWriter类中使用以下接口来添加额外的测量(measurement): ​

    1. public void addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException

    MeasurementSchema类保存了一个测量(measurement)的信息,有几个构造函数:

    1. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding)
    2. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, CompressionType compressionType)
    3. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, CompressionType compressionType,
    4. Map<String, String> props)

    参数:

    • measurementID: 测量的名称,通常是传感器的名称。

    • type: 数据类型,现在支持六种类型: BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT;

    • encoding: 编码类型. 参见 Chapter 2-3.

    • compression: 压缩方式. 现在支持 UNCOMPRESSEDSNAPPY.

    • props: 特殊数据类型的属性。比如说FLOATDOUBLE可以设置max_point_numberTEXT可以设置max_string_length。 可以使用Map来保存键值对,比如(“max_point_number”, “3”)。

    注意: 虽然一个测量(measurement)的名字可以被用在多个deltaObjects中, 但是它的参数是不允许被修改的。比如: 不允许多次为同一个测量(measurement)名添加不同类型的编码。下面是一个错误示例:

    1. // The measurement "sensor_1" is float type
    2. addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
    3. // This call will throw a WriteProcessException exception
    4. addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT32, TSEncoding.RLE));
  • 第三,插入和写入数据。

    使用这个接口创建一个新的TSRecord(时间戳和设备对)。

    1. public TSRecord(long timestamp, String deviceId)

    然后创建一个DataPoint(度量(measurement)和值的对应),并使用 addTuple 方法将数据 DataPoint 添加正确的值到 TsRecord。

    用下面这种方法写

    1. public void write(TSRecord record) throws IOException, WriteProcessException
  • 最后,调用close方法来完成写入过程。

    1. public void close() throws IOException

我们也支持将数据写入已关闭的 TsFile 文件中。

  • 使用ForceAppendTsFileWriter打开已经关闭的文件。
  1. public ForceAppendTsFileWriter(File file) throws IOException
  • 调用 doTruncate 去掉文件的Metadata部分

  • 然后使用 ForceAppendTsFileWriter 构造另一个TsFileWriter

  1. public TsFileWriter(TsFileIOWriter fileWriter) throws IOException

请注意 此时需要重新添加测量值(measurement) 再进行上述写入操作。

写入 TsFile 示例

您需要安装 TsFile 到本地的 Maven 仓库中。

  1. mvn clean install -pl tsfile -am -DskipTests

如果存在非对齐的时序数据(比如:不是所有的传感器都有值),您可以通过构造TSRecord来写入。

更详细的例子可以在/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTSRecord.java中查看

如果所有时序数据都是对齐的,您可以通过构造Tablet来写入数据。

更详细的例子可以在/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTablet.java中查看

在已关闭的TsFile 文件中写入新数据的详细例子可以在/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileForceAppendWrite.java中查看

读取 TsFile 接口

开始之前

“时序数据”章节中的数据集在本章节做具体的介绍。下表中显示的集合包含一个名为”device_1”的 deltaObject,包含了 3 个名为”sensor_1”,”sensor_2”和”sensor_3”的测量(measurement)。 测量值被简化成一个简单的例子,每条数据只包含 4 条时间和值的对应数据。

device_1
sensor_1sensor_2sensor_3
timevaluetimevaluetimevalue
11.2120250
31.4220451
51.1321652
71.8420853

一组时间序列数据

路径的定义

路径是一个点(.)分隔的字符串,它唯一地标识 TsFile 中的时间序列,例如:”root.area_1.device_1.sensor_1”。 最后一部分”sensor_1”称为”measurementId”,其余部分”root.area_1.device_1”称为deviceId。 正如之前提到的,不同设备中的相同测量(measurement)具有相同的数据类型和编码,设备也是唯一的。

在read接口中,参数paths表示要选择的测量值(measurement)。 Path实例可以很容易地通过类Path来构造。例如:

  1. Path p = new Path("device_1.sensor_1");

我们可以为查询传递一个 ArrayList 路径,以支持多个路径查询。

  1. List<Path> paths = new ArrayList<Path>();
  2. paths.add(new Path("device_1.sensor_1"));
  3. paths.add(new Path("device_1.sensor_3"));

注意: 在构造路径时,参数的格式应该是一个点(.)分隔的字符串,最后一部分是measurement,其余部分确认为deviceId。

定义 Filter

使用条件过滤

在 TsFile 读取过程中使用 Filter 来选择满足一个或多个给定条件的数据。

IExpression

IExpression是一个过滤器表达式接口,它将被传递给系统查询时调用。 我们创建一个或多个筛选器表达式,并且可以使用Binary Filter Operators将它们连接形成最终表达式。

  • 创建一个Filter表达式

    有两种类型的过滤器。

    • TimeFilter: 使用时序数据中的time过滤。

      1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter);

      使用以下关系获得一个TimeFilter对象(值是一个 long 型变量)。

      RelationshipDescription
      TimeFilter.eq(value)选择时间等于值的数据
      TimeFilter.lt(value)选择时间小于值的数据
      TimeFilter.gt(value)选择时间大于值的数据
      TimeFilter.ltEq(value)选择时间小于等于值的数据
      TimeFilter.gtEq(value)选择时间大于等于值的数据
      TimeFilter.notEq(value)选择时间不等于值的数据
      TimeFilter.not(TimeFilter)选择时间不满足另一个时间过滤器的数据
    • ValueFilter: 使用时序数据中的value过滤。

      1. IExpression valueFilterExpr = new SingleSeriesExpression(Path, ValueFilter);

      ValueFilter的用法与TimeFilter相同,只是需要确保值的类型等于measurement(在路径中定义)的类型。

  • Binary Filter Operators

    Binary filter operators 可以用来连接两个单独的表达式。

    • BinaryExpression.and(Expression, Expression): 选择同时满足两个表达式的数据。
    • BinaryExpression.or(Expression, Expression): 选择满足任意一个表达式值的数据。
Filter Expression 示例
  • TimeFilterExpression 示例

    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.eq(15)); // series time = 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.ltEq(15)); // series time <= 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.lt(15)); // series time < 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.gtEq(15)); // series time >= 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.notEq(15)); // series time != 15
    1. IExpression timeFilterExpr = BinaryExpression.and(new GlobalTimeExpression(TimeFilter.gtEq(15L)),
    2. new GlobalTimeExpression(TimeFilter.lt(25L))); // 15 <= series time < 25
    1. IExpression timeFilterExpr = BinaryExpression.or(new GlobalTimeExpression(TimeFilter.gtEq(15L)),
    2. new GlobalTimeExpression(TimeFilter.lt(25L))); // series time >= 15 or series time < 25

读取接口

首先,我们打开 TsFile 并从文件路径path中获取一个ReadOnlyTsFile实例。

  1. TsFileSequenceReader reader = new TsFileSequenceReader(path);
  2. ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);

接下来,我们准备路径数组和查询表达式,然后通过这个接口得到最终的QueryExpression对象:

  1. QueryExpression queryExpression = QueryExpression.create(paths, statement);

ReadOnlyTsFile类有两个query方法来执行查询。

  • Method 1

    1. public QueryDataSet query(QueryExpression queryExpression) throws IOException
  • Method 2

    1. public QueryDataSet query(QueryExpression queryExpression, long partitionStartOffset, long partitionEndOffset) throws IOException

    此方法是为高级应用(如 TsFile-Spark 连接器)设计的。

    • 参数 : 对于 method 2,添加了两个额外的参数来支持部分查询(Partial Query):

      • partitionStartOffset: TsFile 的开始偏移量
      • partitionEndOffset: TsFile 的结束偏移量 ​

      什么是部分查询?

      在一些分布式文件系统中(比如:HDFS), 文件被分成几个部分,这些部分被称为”Blocks”并存储在不同的节点中。在涉及的每个节点上并行执行查询可以提高效率。因此需要部分查询(Partial Query)。部分查询(Partial Query)仅支持查询 TsFile 中被QueryConstant.PARTITION_START_OFFSETQueryConstant.PARTITION_END_OFFSET分割的部分。

QueryDataset 接口

上面执行的查询将返回一个QueryDataset对象。

下面是一些用户常用的接口:

  • bool hasNext();

    如果该数据集仍然有数据,则返回true。

  • List<Path> getPaths()

    获取这个数据集中的路径。

  • List<TSDataType> getDataTypes();

    获取数据类型。TSDataType 是一个 enum 类,其值如下:

    1. BOOLEAN,
    2. INT32,
    3. INT64,
    4. FLOAT,
    5. DOUBLE,
    6. TEXT;
  • RowRecord next() throws IOException;

    获取下一条记录。

    RowRecord类包含一个long类型的时间戳和一个List<Field>,用于不同传感器中的数据,我们可以使用两个getter方法来获取它们。

    1. long getTimestamp();
    2. List<Field> getFields();

    要从一个字段获取数据,请使用以下方法:

    1. TSDataType getDataType();
    2. Object getObjectValue();

读取现有 TsFile 示例

您需要安装 TsFile 到本地的 Maven 仓库中。

有关查询语句的更详细示例,请参见 /tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileRead.java

  1. package org.apache.iotdb.tsfile;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
  5. import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
  6. import org.apache.iotdb.tsfile.read.common.Path;
  7. import org.apache.iotdb.tsfile.read.expression.IExpression;
  8. import org.apache.iotdb.tsfile.read.expression.QueryExpression;
  9. import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
  10. import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
  11. import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
  12. import org.apache.iotdb.tsfile.read.filter.TimeFilter;
  13. import org.apache.iotdb.tsfile.read.filter.ValueFilter;
  14. import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
  15. /**
  16. * The class is to show how to read TsFile file named "test.tsfile".
  17. * The TsFile file "test.tsfile" is generated from class TsFileWrite.
  18. * Run TsFileWrite to generate the test.tsfile first
  19. */
  20. public class TsFileRead {
  21. private static void queryAndPrint(ArrayList<Path> paths, ReadOnlyTsFile readTsFile, IExpression statement)
  22. throws IOException {
  23. QueryExpression queryExpression = QueryExpression.create(paths, statement);
  24. QueryDataSet queryDataSet = readTsFile.query(queryExpression);
  25. while (queryDataSet.hasNext()) {
  26. System.out.println(queryDataSet.next());
  27. }
  28. System.out.println("------------");
  29. }
  30. public static void main(String[] args) throws IOException {
  31. // file path
  32. String path = "test.tsfile";
  33. // create reader and get the readTsFile interface
  34. TsFileSequenceReader reader = new TsFileSequenceReader(path);
  35. ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
  36. // use these paths(all sensors) for all the queries
  37. ArrayList<Path> paths = new ArrayList<>();
  38. paths.add(new Path("device_1.sensor_1"));
  39. paths.add(new Path("device_1.sensor_2"));
  40. paths.add(new Path("device_1.sensor_3"));
  41. // no query statement
  42. queryAndPrint(paths, readTsFile, null);
  43. //close the reader when you left
  44. reader.close();
  45. }
  46. }

修改 TsFile 配置项

  1. TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
  2. config.setXXX();