MapReduce TsFile

概要

  • TsFile的Hadoop连接器使用手册
    • 什么是TsFile的Hadoop连接器
    • 系统环境要求
    • 数据类型对应关系
    • 关于TSFInputFormat的说明
    • 使用示例
      • 读示例: 求和
      • 写示例: 计算平均数并写入Tsfile中

TsFile的Hadoop连接器使用手册

什么是TsFile的Hadoop连接器

TsFile的Hadoop连接器实现了对Hadoop读取外部Tsfile类型的文件格式的支持。让用户可以使用Hadoop的map、reduce等操作对Tsfile文件进行读取、写入和查询。

有了这个连接器,用户可以

  • 将单个Tsfile文件加载进Hadoop,不论文件是存储在本地文件系统或者是HDFS中
  • 将某个特定目录下的所有文件加载进Hadoop,不论文件是存储在本地文件系统或者是HDFS中
  • 将Hadoop处理完后的结果以Tsfile的格式保存

系统环境要求

Hadoop 版本Java 版本TsFile 版本
2.7.31.80.9.3

注意:关于如何下载和使用Tsfile, 请参考以下链接: https://github.com/apache/incubator-iotdb/tree/master/tsfile.

数据类型对应关系

TsFile 数据类型Hadoop writable
BOOLEANBooleanWritable
INT32IntWritable
INT64LongWritable
FLOATFloatWritable
DOUBLEDoubleWritable
TEXTText

关于TSFInputFormat的说明

TSFInputFormat继承了Hadoop中FileInputFormat类,重写了其中切片的方法。

目前的切片方法是根据每个ChunkGroup的中点的offset是否属于Hadoop所切片的startOffset和endOffset之间,来判断是否将该ChunkGroup放入此切片。

TSFInputFormat将tsfile中的数据以多个MapWritable记录的形式返回给用户。

假设我们想要从Tsfile中获得名为d1的设备的数据,该设备有三个传感器,名称分别为s1, s2, s3

s1的类型是BOOLEAN, s2的类型是 DOUBLE, s3的类型是TEXT.

MapWritable的结构如下所示:

  1. {
  2. "time_stamp": 10000000,
  3. "device_id": d1,
  4. "s1": true,
  5. "s2": 3.14,
  6. "s3": "middle"
  7. }

在Hadoop的Map job中,你可以采用如下方法获得你想要的任何值

mapwritable.get(new Text("s1"))

注意: MapWritable中所有的键值类型都是Text

使用示例

读示例: 求和

首先,我们需要在TSFInputFormat中配置我们需要哪些数据

  1. // configure reading time enable
  2. TSFInputFormat.setReadTime(job, true);
  3. // configure reading deviceId enable
  4. TSFInputFormat.setReadDeviceId(job, true);
  5. // configure reading which deltaObjectIds
  6. String[] deviceIds = {"device_1"};
  7. TSFInputFormat.setReadDeviceIds(job, deltaObjectIds);
  8. // configure reading which measurementIds
  9. String[] measurementIds = {"sensor_1", "sensor_2", "sensor_3"};
  10. TSFInputFormat.setReadMeasurementIds(job, measurementIds);

然后,必须指定mapper和reducer输出的键和值类型

  1. // set inputformat and outputformat
  2. job.setInputFormatClass(TSFInputFormat.class);
  3. // set mapper output key and value
  4. job.setMapOutputKeyClass(Text.class);
  5. job.setMapOutputValueClass(DoubleWritable.class);
  6. // set reducer output key and value
  7. job.setOutputKeyClass(Text.class);
  8. job.setOutputValueClass(DoubleWritable.class);

接着,就可以编写包含具体的处理数据逻辑的mapperreducer类了。

  1. public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, DoubleWritable> {
  2. @Override
  3. protected void map(NullWritable key, MapWritable value,
  4. Mapper<NullWritable, MapWritable, Text, DoubleWritable>.Context context)
  5. throws IOException, InterruptedException {
  6. Text deltaObjectId = (Text) value.get(new Text("device_id"));
  7. context.write(deltaObjectId, (DoubleWritable) value.get(new Text("sensor_3")));
  8. }
  9. }
  10. public static class TSReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
  11. @Override
  12. protected void reduce(Text key, Iterable<DoubleWritable> values,
  13. Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
  14. throws IOException, InterruptedException {
  15. double sum = 0;
  16. for (DoubleWritable value : values) {
  17. sum = sum + value.get();
  18. }
  19. context.write(key, new DoubleWritable(sum));
  20. }
  21. }

注意: 完整的代码示例可以在如下链接中找到:https://github.com/apache/incubator-iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSFMRReadExample.java

写示例: 计算平均数并写入Tsfile中

除了OutputFormatClass,剩下的配置代码跟上面的读示例是一样的

  1. job.setOutputFormatClass(TSFOutputFormat.class);
  2. // set reducer output key and value
  3. job.setOutputKeyClass(NullWritable.class);
  4. job.setOutputValueClass(HDFSTSRecord.class);

然后,是包含具体的处理数据逻辑的mapperreducer类。

  1. public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, MapWritable> {
  2. @Override
  3. protected void map(NullWritable key, MapWritable value,
  4. Mapper<NullWritable, MapWritable, Text, MapWritable>.Context context)
  5. throws IOException, InterruptedException {
  6. Text deltaObjectId = (Text) value.get(new Text("device_id"));
  7. long timestamp = ((LongWritable)value.get(new Text("timestamp"))).get();
  8. if (timestamp % 100000 == 0) {
  9. context.write(deltaObjectId, new MapWritable(value));
  10. }
  11. }
  12. }
  13. /**
  14. * This reducer calculate the average value.
  15. */
  16. public static class TSReducer extends Reducer<Text, MapWritable, NullWritable, HDFSTSRecord> {
  17. @Override
  18. protected void reduce(Text key, Iterable<MapWritable> values,
  19. Reducer<Text, MapWritable, NullWritable, HDFSTSRecord>.Context context) throws IOException, InterruptedException {
  20. long sensor1_value_sum = 0;
  21. long sensor2_value_sum = 0;
  22. double sensor3_value_sum = 0;
  23. long num = 0;
  24. for (MapWritable value : values) {
  25. num++;
  26. sensor1_value_sum += ((LongWritable)value.get(new Text("sensor_1"))).get();
  27. sensor2_value_sum += ((LongWritable)value.get(new Text("sensor_2"))).get();
  28. sensor3_value_sum += ((DoubleWritable)value.get(new Text("sensor_3"))).get();
  29. }
  30. HDFSTSRecord tsRecord = new HDFSTSRecord(1L, key.toString());
  31. DataPoint dPoint1 = new LongDataPoint("sensor_1", sensor1_value_sum / num);
  32. DataPoint dPoint2 = new LongDataPoint("sensor_2", sensor2_value_sum / num);
  33. DataPoint dPoint3 = new DoubleDataPoint("sensor_3", sensor3_value_sum / num);
  34. tsRecord.addTuple(dPoint1);
  35. tsRecord.addTuple(dPoint2);
  36. tsRecord.addTuple(dPoint3);
  37. context.write(NullWritable.get(), tsRecord);
  38. }
  39. }

注意: 完整的代码示例可以在如下链接中找到:https://github.com/apache/incubator-iotdb/blob/master/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TSMRWriteExample.java