编程-TsFile API

TsFile是我们在IoTDB中使用的时间序列的文件格式。 在本节中,我们要介绍这种文件格式的用法。

Ts文件库安装

在您自己的项目中有两种使用TsFile的方法。

  • 用作jars:

    • 编译源代码并构建为jars

      1. git clone https://github.com/apache/incubator-iotdb.git
      2. cd tsfile/
      3. mvn clean package -Dmaven.test.skip=true

      然后,所有的jar都可以放在名为“ target /”的文件夹中。 将target / tsfile-0.9.3-jar-with-dependencies.jar导入您的项目。

  • 用作Maven依赖项:

    编译源代码并通过三个步骤将其部署到本地存储库:

    • 获取源代码

      1. git clone https://github.com/apache/incubator-iotdb.git
    • 编译源代码并部署

      1. cd tsfile/
      2. mvn clean install -Dmaven.test.skip=true
    • 在项目中添加依赖项:

      1. <dependency>
      2. <groupId>org.apache.iotdb</groupId>
      3. <artifactId>tsfile</artifactId>
      4. <version>0.9.3</version>
      5. </dependency>
  1. 或者,您可以从官方Maven存储库下载依赖项:
  2. - 第一, 在路径上找到您的Maven`settings.xml`: `${username}\.m2\settings.xml` , `<profile>` 加入到 `<profiles>`:
  3. ```
  4. <profile>
  5. <id>allow-snapshots</id>
  6. <activation><activeByDefault>true</activeByDefault></activation>
  7. <repositories>
  8. <repository>
  9. <id>apache.snapshots</id>
  10. <name>Apache Development Snapshot Repository</name>
  11. <url>https://repository.apache.org/content/repositories/snapshots/</url>
  12. <releases>
  13. <enabled>false</enabled>
  14. </releases>
  15. <snapshots>
  16. <enabled>true</enabled>
  17. </snapshots>
  18. </repository>
  19. </repositories>
  20. </profile>
  21. ```
  22. - 然后将依赖项添加到您的项目中:
  23. ```
  24. <dependency>
  25. <groupId>org.apache.iotdb</groupId>
  26. <artifactId>tsfile</artifactId>
  27. <version>0.9.3</version>
  28. </dependency>
  29. ```

TSFile的用法

本节演示TsFile的详细用法。

时间序列数据

时间序列被视为四倍序列。 四元组定义为(设备,测量,时间,值)。

  • 测量: 时间序列进行的物理或形式测量,例如城市温度,某些商品的销售数量或火车在不同时间的速度。 由于传统的传感器(例如温度计)也需要进行一次测量并生成一个时间序列,因此我们将在下面互换使用测量和传感器。

  • 设备: 设备是指正在执行多个测量(产生多个时间序列)的实体,例如,运行中的火车监视其速度,油表,行驶里程,当前乘客均被传送到一个时间序列。

表1说明了一组时间序列数据。 下表中显示的集合包含一个名为“ device \ _1”的设备以及三个名为“ sensor \ _1”,“ sensor \ _2”和“ sensor \ _3”的测量值。

device_1
sensor_1sensor_2sensor_3
timevaluetimevaluetimevalue
11.2120250
31.4220451
51.1321652
71.8420853

一组时间序列数据

一行数据:在许多工业应用中,设备通常包含多个传感器,并且这些传感器可能在同一时间戳上具有值,这称为一行数据。

形式上,一行数据由一个device_id,一个指示自1970年1月1日以来的毫秒数,00:00:00组成的时间戳,以及由measurement_id和对应的value组成的几对数据组成。 一行中的所有数据对都属于该device_id,并且具有相同的时间戳。 如果其中一个measurementstimestamp中没有value ,请改用空格(实际上,TsFile不存储空值)。 其格式如下所示:

  1. device_id, timestamp, <measurement_id, value>...

一个示例如下所示。 在此示例中,两个测量的数据类型分别为“ INT32”和“ FLOAT”。

  1. device_1, 1490860659000, m1, 10, m2, 12.12

编写TsFile

生成一个TsFile文件

可以通过以下三个步骤来生成TsFile,完整的代码将在“编写TsFile的示例”部分中给出。

  • 一,构造一个TsFileWriter实例。

    以下是可用的构造函数:

    • 没有预定义的架构
    1. public TsFileWriter(File file) throws IOException
    • 使用预定义的架构
    1. public TsFileWriter(File file, Schema schema) throws IOException

    这是用于使用HDFS文件系统的。 TsFileOutput可以是HDFSOutput类的实例。

    1. public TsFileWriter(TsFileOutput output, Schema schema) throws IOException

    如果您想自己设置一些TSFile配置,则可以使用paramconfig。 例如:

    1. TSFileConfig conf = new TSFileConfig();
    2. conf.setTSFileStorageFs("HDFS");
    3. TsFileWriter tsFileWriter = new TsFileWriter(file, schema, conf);

    在此示例中,数据文件将存储在HDFS中,而不是本地文件系统中。 如果要将数据文件存储在本地文件系统中,则可以使用conf.setTSFileStorageFs(“ LOCAL”),这也是默认配置。

    您也可以通过config.setHdfsIp(...)config.setHdfsPort(...)配置HDFS的IP和端口。 默认ip为localhost,默认端口为9000

    Parameters:

    • file:要写入的TsFile

    • schema:文件模式将在下一部分中介绍。

    • config:TsFile的配置。

  • 二,添加测量

    或者,您可以先创建Schema类的实例,然后将其传递给TsFileWriter类的构造函数。Schema类包含一个映射,该映射的键是一个度量模式的名称,而值是该模式本身。

    这里是接口:

    1. // 创建一个空的架构或从现有的映射
    2. public Schema()
    3. public Schema(Map<String, MeasurementSchema> measurements)
    4. // Use this two interfaces to add measurements
    5. public void registerMeasurement(MeasurementSchema descriptor)
    6. public void registerMeasurements(Map<String, MeasurementSchema> measurements)
    7. // 一些有用的吸气剂和检查剂
    8. public TSDataType getMeasurementDataType(String measurementId)
    9. public MeasurementSchema getMeasurementSchema(String measurementId)
    10. public Map<String, MeasurementSchema> getAllMeasurementSchema()
    11. public boolean hasMeasurement(String measurementId)

    您始终可以在TsFileWriter类中使用以下接口来添加其他度量:

    1. public void addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException

    MeasurementSchema类包含一种度量的信息,有几种构造函数:

    1. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding)
    2. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, CompressionType compressionType)
    3. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, CompressionType compressionType,
    4. Map<String, String> props)

    参数:

    • measurementID:此测量的名称,通常是传感器的名称。

    • 类型:数据类型,现在支持六种类型:BOOLEANINT32INT64FLOATDOUBLETEXT

    • encoding:数据编码。 见 Chapter 2-3.

    • compression:数据压缩。 现在支持UNCOMPRESSEDSNAPPY

    • props:特殊数据类型的属性,例如FLOATDOUBLEmax_point_numberTEXTmax_string_length。 用作字符串对,例如(“ max_point_number”,“ 3”)。

  1. > \*\*注意:\*\*尽管一个度量名称可以在多个deltaObject中使用,但是不能更改属性。 不允许多次使用不同的类型或编码添加一个测量名称。 这是一个不好的例子:
  2. ```
  3. // 测量值“ sensor_1”为浮点型
  4. addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
  5. // 此调用将引发WriteProcessException异常
  6. addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT32, TSEncoding.RLE));
  7. ```
  • 第三,连续插入和写入数据。 ​ 使用此接口可以创建新的TSRecord(时间戳和设备对)。

    1. public TSRecord(long timestamp, String deviceId)

    然后创建一个DataPoint(一个测量值和一个值对),并使用addTuple方法将DataPoint添加到正确的TsRecord。 ​ 用这种方法写

    1. public void write(TSRecord record) throws IOException, WriteProcessException
  • 最后,调用close完成此编写过程。

    1. public void close() throws IOException
编写TsFile的示例

您应该将TsFile安装到本地Maven存储库。

  1. mvn clean install -pl tsfile -am -DskipTests

如果您具有未对齐(例如,并非所有传感器都包含值)的时间序列数据,则可以通过构造**TSRecord **来编写TsFile。

可以在以下位置找到更详尽的示例/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithTSRecord.java

  1. package org.apache.iotdb.tsfile;
  2. import java.io.File;
  3. import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  4. import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
  5. import org.apache.iotdb.tsfile.write.TsFileWriter;
  6. import org.apache.iotdb.tsfile.write.record.TSRecord;
  7. import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
  8. import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
  9. import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
  10. import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  11. /**
  12. * An example of writing data to TsFile
  13. * It uses the interface:
  14. * public void addMeasurement(MeasurementSchema MeasurementSchema) throws WriteProcessException
  15. */
  16. public class TsFileWriteWithTSRecord {
  17. public static void main(String args[]) {
  18. try {
  19. String path = "test.tsfile";
  20. File f = new File(path);
  21. if (f.exists()) {
  22. f.delete();
  23. }
  24. TsFileWriter tsFileWriter = new TsFileWriter(f);
  25. // add measurements into file schema
  26. tsFileWriter
  27. .addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
  28. tsFileWriter
  29. .addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.RLE));
  30. tsFileWriter
  31. .addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.RLE));
  32. // construct TSRecord
  33. TSRecord tsRecord = new TSRecord(1, "device_1");
  34. DataPoint dPoint1 = new LongDataPoint("sensor_1", 1);
  35. DataPoint dPoint2 = new LongDataPoint("sensor_2", 2);
  36. DataPoint dPoint3 = new LongDataPoint("sensor_3", 3);
  37. tsRecord.addTuple(dPoint1);
  38. tsRecord.addTuple(dPoint2);
  39. tsRecord.addTuple(dPoint3);
  40. // write TSRecord
  41. tsFileWriter.write(tsRecord);
  42. // close TsFile
  43. tsFileWriter.close();
  44. } catch (Throwable e) {
  45. e.printStackTrace();
  46. System.out.println(e.getMessage());
  47. }
  48. }
  49. }

如果您具有对齐的时间序列数据,则可以通过构造**RowBatch **来编写TsFile。 ​ 可以在/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileWriteWithRowBatch.java中找到更详尽的示例。

  1. package org.apache.iotdb.tsfile;
  2. import java.io.File;
  3. import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  4. import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
  5. import org.apache.iotdb.tsfile.write.TsFileWriter;
  6. import org.apache.iotdb.tsfile.write.schema.Schema;
  7. import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  8. import org.apache.iotdb.tsfile.write.record.RowBatch;
  9. /**
  10. * An example of writing data with RowBatch to TsFile
  11. */
  12. public class TsFileWriteWithRowBatch {
  13. public static void main(String[] args) {
  14. try {
  15. String path = "test.tsfile";
  16. File f = new File(path);
  17. if (f.exists()) {
  18. f.delete();
  19. }
  20. Schema schema = new Schema();
  21. // the number of rows to include in the row batch
  22. int rowNum = 1000000;
  23. // the number of values to include in the row batch
  24. int sensorNum = 10;
  25. // add measurements into file schema (all with INT64 data type)
  26. for (int i = 0; i < sensorNum; i++) {
  27. schema.registerMeasurement(
  28. new MeasurementSchema("sensor_" + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
  29. }
  30. // add measurements into TSFileWriter
  31. TsFileWriter tsFileWriter = new TsFileWriter(f, schema);
  32. // construct the row batch
  33. RowBatch rowBatch = schema.createRowBatch("device_1");
  34. long[] timestamps = rowBatch.timestamps;
  35. Object[] values = rowBatch.values;
  36. long timestamp = 1;
  37. long value = 1000000L;
  38. for (int r = 0; r < rowNum; r++, value++) {
  39. int row = rowBatch.batchSize++;
  40. timestamps[row] = timestamp++;
  41. for (int i = 0; i < sensorNum; i++) {
  42. long[] sensor = (long[]) values[i];
  43. sensor[row] = value;
  44. }
  45. // write RowBatch to TsFile
  46. if (rowBatch.batchSize == rowBatch.getMaxBatchSize()) {
  47. tsFileWriter.write(rowBatch);
  48. rowBatch.reset();
  49. }
  50. }
  51. // write RowBatch to TsFile
  52. if (rowBatch.batchSize != 0) {
  53. tsFileWriter.write(rowBatch);
  54. rowBatch.reset();
  55. }
  56. // close TsFile
  57. tsFileWriter.close();
  58. } catch (Throwable e) {
  59. e.printStackTrace();
  60. System.out.println(e.getMessage());
  61. }
  62. }
  63. }

读取TsFile的接口

开始之前

“时间序列数据”部分中的时间序列数据集在此用于本节的具体介绍。 下表中显示的集合包含一个名为“ device \ _1”的deltaObject和三个名为“ sensor \ _1”,“ sensor \ _2”和“ sensor \ _3”的测量值。 并简化了测量以进行简单说明,每个仅包含4个时间值对。

device_1
sensor_1sensor_2sensor_3
timevaluetimevaluetimevalue
11.2120250
31.4220451
51.1321652
71.8420853

一组时间序列数据

路径定义

路径是用点分隔的字符串,该字符串在TsFile中唯一标识时间序列,例如“ root.area_1.device_1.sensor_1”。 最后一部分“ sensor_1”称为“ measurementId”,其余部分“ root.area_1.device_1”称为deviceId。 如上所述,不同设备中的相同测量具有相同的数据类型和编码,并且设备也是唯一的。

在读取接口中,参数路径指示要选择的测量。 通过类Path可以轻松构造Path实例。 例如:

  1. Path p = new Path("device_1.sensor_1");

我们将传递路径的ArrayList进行最终查询,以支持多个路径。

  1. List<Path> paths = new ArrayList<Path>();
  2. paths.add(new Path("device_1.sensor_1"));
  3. paths.add(new Path("device_1.sensor_3"));

**注意:**在构造路径时,参数的格式应为点分隔的字符串,最后一部分将被识别为MeasurementId,而其余部分将被识别为deviceId。

过滤器的定义
使用场景

在TsFile读取过程中使用筛选器来选择满足一个或多个给定条件的数据。

IExpression

IExpression是一个过滤器表达式接口,它将被传递给我们的最终查询调用。 我们创建一个或多个过滤器表达式,并可以使用二进制过滤器运算符将它们链接到最终表达式。

  • 创建一个过滤器表达式

    有两种类型的过滤器。

    • TimeFilter:时间序列数据中时间的过滤器。

      1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter);

      使用以下关系获取TimeFilter对象(值是一个long int变量)。

      RelationshipDescription
      TimeFilter.eq(value)Choose the time equal to the value
      TimeFilter.lt(value)Choose the time less than the value
      TimeFilter.gt(value)Choose the time greater than the value
      TimeFilter.ltEq(value)Choose the time less than or equal to the value
      TimeFilter.gtEq(value)Choose the time greater than or equal to the value
      TimeFilter.notEq(value)Choose the time not equal to the value
      TimeFilter.not(TimeFilter)Choose the time not satisfy another TimeFilter
    • ValueFilter:时间序列数据中值的过滤器。

      1. IExpression valueFilterExpr = new SingleSeriesExpression(Path, ValueFilter);

      ValueFilter的用法与TimeFilter的用法相同,只是要确保值的类型等于度量值(在路径中定义)。

  • 二元滤波器运算符

    二进制过滤器运算符可用于链接两个单个表达式。

    • BinaryExpression.and(Expression,Expression):选择两个表达式都满足的值。
    • BinaryExpression.or(Expression,Expression):选择至少满足一个表达式的值。
过滤器表达式示例
  • TimeFilterExpression示例

    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.eq(15)); // series time = 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.ltEq(15)); // series time <= 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.lt(15)); // series time < 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.gtEq(15)); // series time >= 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.notEq(15)); // series time != 15
    1. IExpression timeFilterExpr = BinaryExpression.and(new GlobalTimeExpression(TimeFilter.gtEq(15L)),
    2. new GlobalTimeExpression(TimeFilter.lt(25L))); // 15 <= series time < 25
    1. IExpression timeFilterExpr = BinaryExpression.or(new GlobalTimeExpression(TimeFilter.gtEq(15L)),
    2. new GlobalTimeExpression(TimeFilter.lt(25L))); // series time >= 15 or series time < 25
读取界面

首先,我们打开TsFile并从文件路径字符串path获取一个ReadOnlyTsFile实例。

  1. TsFileSequenceReader reader = new TsFileSequenceReader(path);
  2. ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);

接下来,我们准备路径数组和查询表达式,然后通过此接口获取最终的QueryExpression对象:

  1. QueryExpression queryExpression = QueryExpression.create(paths, statement);

ReadOnlyTsFile类具有两种查询方法来执行查询。

  • 方法1

    1. public QueryDataSet query(QueryExpression queryExpression) throws IOException
  • 方法二

    1. public QueryDataSet query(QueryExpression queryExpression, long partitionStartOffset, long partitionEndOffset) throws IOException

    此方法设计用于高级应用程序,例如TsFile-Spark连接器。

    • 参数 : 对于方法2,添加了两个附加参数以支持部分查询:

      • partitionStartOffset: TsFile的起始偏移量
      • partitionEndOffset: TsFile的结束偏移量

      什么是部分查询?

      在某些分布式文件系统(例如HDFS)中,文件被分成几个部分,这些部分称为“块”,并存储在不同的节点中。 在涉及的每个节点中并行执行查询可以提高效率。 因此,需要部分查询。 Paritial Query只选择存储在零件中的结果,该零件由TsFile的QueryConstant.PARTITION_START_OFFSETQueryConstant.PARTITION_END_OFFSET分开。

QueryDataset接口

上面执行的查询将返回一个QueryDataset对象。

这是对用户有用的界面。

  • bool hasNext();

    如果此数据集仍包含元素,则返回true。

  • List<Path> getPaths()

    获取此数据集中的路径。

  • List<TSDataType> getDataTypes();

    获取数据类型。 TSDataType类是一个枚举类,值将是以下值之一:

    1. BOOLEAN,
    2. INT32,
    3. INT64,
    4. FLOAT,
    5. DOUBLE,
    6. TEXT;
  • RowRecord next() throws IOException;

    获取下一条记录。

    RowRecord类由一个很长的时间戳和一个List <Field>组成,用于不同传感器中的数据,我们可以使用两种getter方法来获取它们。

    1. long getTimestamp();
    2. List<Field> getFields();

    要从一个字段获取数据,请使用以下方法:

    1. TSDataType getDataType();
    2. Object getObjectValue();
读取现有TsFile的示例

您应该将TsFile安装到本地Maven存储库。

有关查询语句的更详尽示例,请参见: /tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileRead.java

  1. package org.apache.iotdb.tsfile;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
  5. import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
  6. import org.apache.iotdb.tsfile.read.common.Path;
  7. import org.apache.iotdb.tsfile.read.expression.IExpression;
  8. import org.apache.iotdb.tsfile.read.expression.QueryExpression;
  9. import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
  10. import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
  11. import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
  12. import org.apache.iotdb.tsfile.read.filter.TimeFilter;
  13. import org.apache.iotdb.tsfile.read.filter.ValueFilter;
  14. import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
  15. /**
  16. * 该类将显示如何读取名为“ test.ts文件”的Ts文件文件。
  17. * TsFile文件“ test.tsfile”是从类TsFileWrite生成的。
  18. * 运行TsFileWrite首先生成test.tsfile
  19. */
  20. public class TsFileRead {
  21. private static void queryAndPrint(ArrayList<Path> paths, ReadOnlyTsFile readTsFile, IExpression statement)
  22. throws IOException {
  23. QueryExpression queryExpression = QueryExpression.create(paths, statement);
  24. QueryDataSet queryDataSet = readTsFile.query(queryExpression);
  25. while (queryDataSet.hasNext()) {
  26. System.out.println(queryDataSet.next());
  27. }
  28. System.out.println("------------");
  29. }
  30. public static void main(String[] args) throws IOException {
  31. // 文件路径
  32. String path = "test.tsfile";
  33. // 创建阅读器并获取readTsFile接口
  34. TsFileSequenceReader reader = new TsFileSequenceReader(path);
  35. ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
  36. // 使用这些路径(所有传感器)进行所有查询
  37. ArrayList<Path> paths = new ArrayList<>();
  38. paths.add(new Path("device_1.sensor_1"));
  39. paths.add(new Path("device_1.sensor_2"));
  40. paths.add(new Path("device_1.sensor_3"));
  41. // 没有查询语句
  42. queryAndPrint(paths, readTsFile, null);
  43. //离开时关闭阅读器
  44. reader.close();
  45. }
  46. }

修改 TsFile 配置项

  1. TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
  2. config.setXXX()

布隆过滤器

布隆过滤器在加载元数据之前检查给定的时间序列是否在ts文件中。 这样可以提高加载元数据的性能,并跳过不包含指定时间序列的tsfile。 如果要了解有关其机制的更多信息,可以参考: wiki page of bloom filterTsFile API - 图1.

配置

您可以通过修改 TSFileConfig 里的 bloomFilterErrorRate 参数来控制Bloom过滤器的误报率。

  1. # The acceptable error rate of bloom filter, should be in [0.01, 0.1], default is 0.05
  2. bloom_filter_error_rate=0.05