用法

现在,您准备开始使用TsFile做一些很棒的事情。 本节演示TsFile的详细用法。

时间序列数据

时间序列被视为四倍序列。 四元组定义为(设备,测量,时间,值)。

  • 测量: 时间序列进行的物理或形式测量,例如城市温度,某些商品的销售数量或火车在不同时间的速度。 由于传统的传感器(例如温度计)也需要进行一次测量并生成一个时间序列,因此我们将在下面互换使用测量和传感器。

  • 设备: 设备是指正在执行多个测量(产生多个时间序列)的实体,例如,运行中的火车监视其速度,油表,行驶里程,当前乘客均被传送到一个时间序列。

表1说明了一组时间序列数据。 下表中显示的集合包含一个名为“ device_1”的设备以及三个名为“ sensor_1”,“ sensor_2”和“ sensor_3”的测量值。

device_1
sensor_1sensor_2sensor_3
timevaluetimevaluetimevalue
11.2120250
31.4220451
51.1321652
71.8420853

一组时间序列数据

一行数据: 在许多工业应用中,设备通常包含多个传感器,并且这些传感器可能在同一时间戳上具有值,这称为一行数据。

形式上,一行数据由一个device_id,一个指示自1970年1月1日以来的毫秒数,00:00:00组成的时间戳,以及由measurement_id和对应的value组成的几对数据组成。 一行中的所有数据对都属于该device_id,并且具有相同的时间戳。 如果其中一个measurementsvalue 中没有value,请改用空格(实际上,TsFile不存储空值)。 其格式如下所示:

  1. device_id, timestamp, <measurement_id, value>...

一个示例如下所示。 在此示例中,两个测量的数据类型分别为INT32和FLOAT。

  1. device_1, 1490860659000, m1, 10, m2, 12.12

编写TsFile

生成一个TsFile文件。

可以通过以下三个步骤来生成TsFile,完整的代码将在“编写TsFile的示例”部分中给出。

  • 首先,构造一个TsFileWriter实例。

    以下是可用的构造函数:

    • 没有预定义的架构
    1. public TsFileWriter(File file) throws IOException
    • 使用预定义的架构
    1. public TsFileWriter(File file, FileSchema schema) throws IOException

    这是用于使用HDFS文件系统的。 TsFileOutput可以是HDFSOutput类的实例。

    1. public TsFileWriter(TsFileOutput output, FileSchema schema) throws IOException

    参量:

    • 文件: 要写的TsFile

    • 模式: 文件模式将在下一部分中介绍。

  • 二,添加测量

    或者,您可以先创建FileSchema类的实例,然后将其传递给TsFileWriter类的构造函数。

    FileSchema类包含一个映射,该映射的键是一个测量模式的名称,而值是该模式本身。

    这里是接口:

    1. // 创建一个空的FileSchema或从现有的映射
    2. public FileSchema()
    3. public FileSchema(Map<String, MeasurementSchema> measurements)
    4. // 使用这两个界面添加测量
    5. public void registerMeasurement(MeasurementSchema descriptor)
    6. public void registerMeasurements(Map<String, MeasurementSchema> measurements)
    7. // 一些有用的吸气剂和检查剂
    8. public TSDataType getMeasurementDataType(String measurementId)
    9. public MeasurementSchema getMeasurementSchema(String measurementId)
    10. public Map<String, MeasurementSchema> getAllMeasurementSchema()
    11. public boolean hasMeasurement(String measurementId)

    您可以始终在TsFileWriter类中使用以下接口来添加其他度量: ​

    1. public void addMeasurement(MeasurementSchema measurementSchema) throws WriteProcessException

    MeasurementSchema类包含一种度量的信息,有几种构造函数:

    1. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding)
    2. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, CompressionType compressionType)
    3. public MeasurementSchema(String measurementId, TSDataType type, TSEncoding encoding, CompressionType compressionType,
    4. Map<String, String> props)

    参数:

    • measurementID:此测量的名称,通常是传感器的名称。

    • 类型:数据类型,现在支持六种类型:BOOLEANINT32INT64FLOATDOUBLETEXT

    • encoding:数据编码。 看到 章节 2-3.

    • compression:数据压缩。 现在支持UNCOMPRESSEDSNAPPY

    • props:特殊数据类型的属性,例如FLOATDOUBLEmax_point_numberTEXTmax_string_length。 用作字符串对,例如(“ max_point_number”,“ 3”)。

  1. > \*\*注意:\*\*尽管一个度量名称可以在多个deltaObject中使用,但是不能更改属性。 不允许多次使用不同的类型或编码添加一个测量名称。 这是一个不好的例子:
  2. ```
  3. // 测量值“ sensor_1”为浮点型
  4. addMeasurement(new MeasurementSchema("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
  5. // 此调用将引发WriteProcessException异常
  6. addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT32, TSEncoding.RLE));
  7. ```
  • 第三,连续插入和写入数据。

    使用此接口可以创建新的TSRecord(时间戳和设备对)。

    1. public TSRecord(long timestamp, String deviceId)

    然后创建一个DataPoint(一个测量值和一个值对),并使用addTuple方法将DataPoint添加到正确的TsRecord。

    用这种方法写

    1. public void write(TSRecord record) throws IOException, WriteProcessException
  • 最后,调用close完成此编写过程。

    1. public void close() throws IOException

编写TsFile的示例

您应该将TsFile安装到本地Maven存储库。

  1. mvn clean install -pl tsfile -am -DskipTests

可以在以下位置找到更详尽的示例/tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileWrite.java

  1. package org.apache.iotdb.tsfile;
  2. import java.io.File;
  3. import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
  4. import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
  5. import org.apache.iotdb.tsfile.write.TsFileWriter;
  6. import org.apache.iotdb.tsfile.write.record.TSRecord;
  7. import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
  8. import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
  9. import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
  10. import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
  11. /**
  12. * 将数据写入TsFile的示例
  13. * 它使用以下接口:
  14. * 公共无效addMeasurement(MeasurementSchema MeasurementSchema)抛出WriteProcessException
  15. */
  16. public class TsFileWrite {
  17. public static void main(String args[]) {
  18. try {
  19. String path = "test.tsfile";
  20. File f = new File(path);
  21. if (f.exists()) {
  22. f.delete();
  23. }
  24. TsFileWriter tsFileWriter = new TsFileWriter(f);
  25. // 将度量添加到文件模式
  26. tsFileWriter
  27. .addMeasurement(new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.RLE));
  28. tsFileWriter
  29. .addMeasurement(new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.RLE));
  30. tsFileWriter
  31. .addMeasurement(new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.RLE));
  32. // 构造TSRecord
  33. TSRecord tsRecord = new TSRecord(1, "device_1");
  34. DataPoint dPoint1 = new LongDataPoint("sensor_1", 1);
  35. DataPoint dPoint2 = new LongDataPoint("sensor_2", 2);
  36. DataPoint dPoint3 = new LongDataPoint("sensor_3", 3);
  37. tsRecord.addTuple(dPoint1);
  38. tsRecord.addTuple(dPoint2);
  39. tsRecord.addTuple(dPoint3);
  40. // 写TSRecord
  41. tsFileWriter.write(tsRecord);
  42. // 关闭TsFile
  43. tsFileWriter.close();
  44. } catch (Throwable e) {
  45. e.printStackTrace();
  46. System.out.println(e.getMessage());
  47. }
  48. }
  49. }

读取TsFile的接口

开始之前

“时间序列数据”部分中的时间序列数据集在此用于本节的具体介绍。 下表中显示的集合包含一个名为“ device \ _1”的deltaObject和三个名为“ sensor \ _1”,“ sensor \ _2”和“ sensor \ _3”的测量值。 并简化了测量以进行简单说明,每个仅包含4个时间值对。

关系描述
TimeFilter.eq(value)选择等于值的时间
TimeFilter.lt(value)选择小于时间的时间
TimeFilter.gt(value)选择大于值的时间
TimeFilter.ltEq(value)选择小于或等于该值的时间
TimeFilter.gtEq(value)选择大于或等于该值的时间
TimeFilter.notEq(value)选择不等于该值的时间
TimeFilter.not(TimeFilter)选择不满足另一个TimeFilter的时间

路径定义

路径是用点分隔的字符串,该字符串在TsFile中唯一标识时间序列,例如“ root.area_1.device_1.sensor_1”。 最后一部分“ sensor_1”称为“ measurementId”,其余部分“ root.area_1.device_1”称为deviceId。 如上所述,不同设备中的相同测量具有相同的数据类型和编码,并且设备也是唯一的。

在读取接口中,参数paths指示要选择的测量。

路径实例可以通过Path类很容易地构造。 例如:

  1. Path p = new Path("device_1.sensor_1");

我们将传递路径的ArrayList进行最终查询,以支持多个路径。

  1. List<Path> paths = new ArrayList<Path>();
  2. paths.add(new Path("device_1.sensor_1"));
  3. paths.add(new Path("device_1.sensor_3"));

**注意:**在构造路径时,参数的格式应为点分隔的字符串,最后一部分将被识别为MeasurementId,而其余部分将被识别为deviceId。

过滤器的定义

使用场景

在TsFile读取过程中使用筛选器来选择满足一个或多个给定条件的数据。

IExpression

IExpression是一个过滤器表达式接口,它将传递给我们的最终查询调用。 我们创建一个或多个过滤器表达式,并可以使用二进制过滤器运算符将它们链接到最终表达式。

  • 创建一个过滤器表达式

    有两种类型的过滤器。

    • 时间过滤器: 时间序列数据中时间的过滤器。

      1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter);
  1. ```
  2. 使用以下关系获取`TimeFilter`对象(值是一个long int变量)。
  3. | 关系 | 描述 |
  4. | :------------------------: | :------------------------------: |
  5. | TimeFilter.eq(value) | 选择等于值的时间 |
  6. | TimeFilter.lt(value) | 选择小于时间的时间 |
  7. | TimeFilter.gt(value) | 选择大于值的时间 |
  8. | TimeFilter.ltEq(value) | 选择小于或等于该值的时间 |
  9. | TimeFilter.gtEq(value) | 选择大于或等于该值的时间 |
  10. | TimeFilter.notEq(value) | 选择不等于该值的时间 |
  11. | TimeFilter.not(TimeFilter) | 选择不满足另一个TimeFilter的时间 |
  12. * ValueFilter:时间序列数据中`value`的过滤器。
  13. ```
  14. IExpression valueFilterExpr = new SingleSeriesExpression(Path, ValueFilter);
  15. ```
  16. ​ `ValueFilter`的用法与使用`TimeFilter`的用法相同,只是要确保值的类型等于测量值(在路径中定义)。
  17. ```
  • 二元滤波器运算符

    二进制过滤器运算符可用于链接两个单个表达式。

    • BinaryExpression.and(Expression,Expression):选择两个表达式都满足的值。
    • BinaryExpression.or(Expression,Expression):选择至少满足一个表达式的值。
过滤器表达式示例
  • TimeFilterExpression示例

    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.eq(15)); // series time = 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.ltEq(15)); // series time <= 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.lt(15)); // series time < 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.gtEq(15)); // series time >= 15
    1. IExpression timeFilterExpr = new GlobalTimeExpression(TimeFilter.notEq(15)); // series time != 15
    1. IExpression timeFilterExpr = BinaryExpression.and(new GlobalTimeExpression(TimeFilter.gtEq(15L)),
    2. new GlobalTimeExpression(TimeFilter.lt(25L))); // 15 <= series time < 25
    1. IExpression timeFilterExpr = BinaryExpression.or(new GlobalTimeExpression(TimeFilter.gtEq(15L)),
    2. new GlobalTimeExpression(TimeFilter.lt(25L))); // series time >= 15 or series time < 25

读取界面

首先,我们打开TsFile并从文件路径字符串path获取一个ReadOnlyTsFile实例。

  1. TsFileSequenceReader reader = new TsFileSequenceReader(path);
  2. ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);

接下来,我们准备路径数组和查询表达式,然后通过此接口获取最终的QueryExpression对象:

  1. QueryExpression queryExpression = QueryExpression.create(paths, statement);

ReadOnlyTsFile类具有两个query方法来执行查询。

  • 方法1

    1. public QueryDataSet query(QueryExpression queryExpression) throws IOException
  • 方法二

    1. public QueryDataSet query(QueryExpression queryExpression, long partitionStartOffset, long partitionEndOffset) throws IOException

    此方法设计用于高级应用程序,例如TsFile-Spark连接器。

    • 参数: 对于方法2,添加了两个附加参数以支持部分查询:

      • partitionStartOffset:TsFile的起始偏移量
      • partitionEndOffset: TsFile的结束偏移量

      **什么是部分查询?**在某些分布式文件系统(例如HDFS)中,文件被分成几个部分,这些部分称为“块”,并存储在不同的节点中。 在涉及的每个节点中并行执行查询可以提高效率。 因此,需要部分查询。 Paritial Query只选择存储在零件中的结果,该零件由TsFile的QueryConstant.PARTITION_START_OFFSETQueryConstant.PARTITION_END_OFFSET分开。

QueryDataset接口

上面执行的查询将返回一个QueryDataset对象。

这是对用户有用的界面。

  • bool hasNext();

    如果此数据集仍包含元素,则返回true。

  • List<Path> getPaths()

    获取此数据集中的路径。

  • List<TSDataType> getDataTypes();

    获取数据类型。 TSDataType类是一个枚举类,值将是以下值之一:

    1. BOOLEAN,
    2. INT32,
    3. INT64,
    4. FLOAT,
    5. DOUBLE,
    6. TEXT;
  • RowRecord next() throws IOException;

    获取下一条记录。

    RowRecord类由一个long时间戳和一个List <Field>组成,用于不同传感器中的数据,我们可以使用两种getter方法来获取它们。

    1. long getTimestamp();
    2. List<Field> getFields();

    要从一个字段获取数据,请使用以下方法:

    1. TSDataType getDataType();
    2. Object getObjectValue();

读取现有TsFile的示例

您应该将TsFile安装到本地Maven存储库。

  1. mvn clean install -pl tsfile -am -DskipTests

有关查询语句的更详尽示例,请参见: /tsfile/example/src/main/java/org/apache/iotdb/tsfile/TsFileRead.java

  1. package org.apache.iotdb.tsfile;
  2. import java.io.IOException;
  3. import java.util.ArrayList;
  4. import org.apache.iotdb.tsfile.read.ReadOnlyTsFile;
  5. import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
  6. import org.apache.iotdb.tsfile.read.common.Path;
  7. import org.apache.iotdb.tsfile.read.expression.IExpression;
  8. import org.apache.iotdb.tsfile.read.expression.QueryExpression;
  9. import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
  10. import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
  11. import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
  12. import org.apache.iotdb.tsfile.read.filter.TimeFilter;
  13. import org.apache.iotdb.tsfile.read.filter.ValueFilter;
  14. import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
  15. /**
  16. * 该类将显示如何读取名为“ test.ts文件”的Ts文件文件。
  17. * TsFile文件“ test.tsfile”是从类TsFileWrite生成的。
  18. * 运行TsFileWrite首先生成test.tsfile
  19. */
  20. public class TsFileRead {
  21. private static void queryAndPrint(ArrayList<Path> paths, ReadOnlyTsFile readTsFile, IExpression statement)
  22. throws IOException {
  23. QueryExpression queryExpression = QueryExpression.create(paths, statement);
  24. QueryDataSet queryDataSet = readTsFile.query(queryExpression);
  25. while (queryDataSet.hasNext()) {
  26. System.out.println(queryDataSet.next());
  27. }
  28. System.out.println("------------");
  29. }
  30. public static void main(String[] args) throws IOException {
  31. // 文件路径
  32. String path = "test.tsfile";
  33. // 创建阅读器并获取readTsFile接口
  34. TsFileSequenceReader reader = new TsFileSequenceReader(path);
  35. ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader);
  36. // 使用这些路径(所有传感器)进行所有查询
  37. ArrayList<Path> paths = new ArrayList<>();
  38. paths.add(new Path("device_1.sensor_1"));
  39. paths.add(new Path("device_1.sensor_2"));
  40. paths.add(new Path("device_1.sensor_3"));
  41. // 没有查询语句
  42. queryAndPrint(paths, readTsFile, null);
  43. // 离开时关闭阅读器
  44. reader.close();
  45. }
  46. }

用户指定的配置文件路径

默认配置文件tsfile-format.properties.template位于/ tsfile / src / main / resources目录中。 如果要使用自己的路径,则可以:

  1. System.setProperty(TsFileConstant.TSFILE_CONF, "your config file path");

然后调用:

  1. TSFileConfig config = TSFileDescriptor.getInstance().getConfig();