TsFile-Flink-Connector 对Tsfile类型的外部数据源实现 Flink 的支持。 这使用户可以通过 Flink DataStream/DataSet 进行读取,写入和查询。

使用此连接器,您可以

  • 从本地文件系统或 hdfs 加载单个或多个 TsFile (只支持以DataSet的形式)到 Flink 。
  • 将本地文件系统或 hdfs 中特定目录中的所有文件加载到 Flink 中。

快速开始

TsFileInputFormat 示例

  1. 使用默认的 RowRowRecordParser 创建 TsFileInputFormat 。
  1. String[] filedNames = {
  2. QueryConstant.RESERVED_TIME,
  3. "device_1.sensor_1",
  4. "device_1.sensor_2",
  5. "device_1.sensor_3",
  6. "device_2.sensor_1",
  7. "device_2.sensor_2",
  8. "device_2.sensor_3"
  9. };
  10. TypeInformation[] typeInformations = new TypeInformation[] {
  11. Types.LONG,
  12. Types.FLOAT,
  13. Types.INT,
  14. Types.INT,
  15. Types.FLOAT,
  16. Types.INT,
  17. Types.INT
  18. };
  19. List<Path> paths = Arrays.stream(filedNames)
  20. .filter(s -> !s.equals(QueryConstant.RESERVED_TIME))
  21. .map(Path::new)
  22. .collect(Collectors.toList());
  23. RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
  24. QueryExpression queryExpression = QueryExpression.create(paths, null);
  25. RowRowRecordParser parser = RowRowRecordParser.create(rowTypeInfo, queryExpression.getSelectedSeries());
  26. TsFileInputFormat inputFormat = new TsFileInputFormat<>(queryExpression, parser);
  1. 从输入格式读取数据并打印到标准输出 stdout:

DataStream:

  1. StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. inputFormat.setFilePath("source.tsfile");
  3. DataStream<Row> source = senv.createInput(inputFormat);
  4. DataStream<String> rowString = source.map(Row::toString);
  5. Iterator<String> result = DataStreamUtils.collect(rowString);
  6. while (result.hasNext()) {
  7. System.out.println(result.next());
  8. }

DataSet:

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. inputFormat.setFilePath("source.tsfile");
  3. DataSet<Row> source = env.createInput(inputFormat);
  4. List<String> result = source.map(Row::toString).collect();
  5. for (String s : result) {
  6. System.out.println(s);
  7. }

TSRecordOutputFormat 示例

  1. 使用默认的 RowTSRecordConverter 创建 TSRecordOutputFormat 。
  1. String[] filedNames = {
  2. QueryConstant.RESERVED_TIME,
  3. "device_1.sensor_1",
  4. "device_1.sensor_2",
  5. "device_1.sensor_3",
  6. "device_2.sensor_1",
  7. "device_2.sensor_2",
  8. "device_2.sensor_3"
  9. };
  10. TypeInformation[] typeInformations = new TypeInformation[] {
  11. Types.LONG,
  12. Types.LONG,
  13. Types.LONG,
  14. Types.LONG,
  15. Types.LONG,
  16. Types.LONG,
  17. Types.LONG
  18. };
  19. RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
  20. Schema schema = new Schema();
  21. schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
  22. schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
  23. schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
  24. RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
  25. TSRecordOutputFormat<Row> outputFormat = new TSRecordOutputFormat<>(schema, converter);
  1. 通过输出格式写数据:

DataStream:

  1. StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
  2. senv.setParallelism(1);
  3. List<Tuple7> data = new ArrayList<>(7);
  4. data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
  5. data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
  6. data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
  7. data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
  8. data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
  9. data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
  10. data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
  11. outputFormat.setOutputFilePath(new org.apache.flink.core.fs.Path(path));
  12. DataStream<Tuple7> source = senv.fromCollection(
  13. data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
  14. source.map(t -> {
  15. Row row = new Row(7);
  16. for (int i = 0; i < 7; i++) {
  17. row.setField(i, t.getField(i));
  18. }
  19. return row;
  20. }).returns(rowTypeInfo).writeUsingOutputFormat(outputFormat);
  21. senv.execute();

DataSet:

  1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. List<Tuple7> data = new ArrayList<>(7);
  4. data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
  5. data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
  6. data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
  7. data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
  8. data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
  9. data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
  10. data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
  11. DataSet<Tuple7> source = env.fromCollection(
  12. data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
  13. source.map(t -> {
  14. Row row = new Row(7);
  15. for (int i = 0; i < 7; i++) {
  16. row.setField(i, t.getField(i));
  17. }
  18. return row;
  19. }).returns(rowTypeInfo).write(outputFormat, path);
  20. env.execute();