9.HBase and MapReduce

Apache MapReduce是一种用来分析大量数据的软件框架,是Hadoop最常使用的框架。MapReduce本身超出本文范围。 A good place to get started with MapReduce is http://hadoop.apache.org/docs/r2.6.0/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html。 MapReduce version 2 (MR2)is now part of YARN.

本章讨论下要在HBase中使用MapReduce需要指定的配置步骤。另外,讨论下其他HBase和MapReduce工作之间的相互作用和问题。 Finally, it discusses Cascading, an alternative API for MapReduce。

46.HBase, Mapreduce, and the CLASSPATH

默认情况下,MapReduce job部署在MR集群上,不会与$HBASE_CONF_DIR下的HBase配置或HBase类产生关联。

要使MapReduce工作获得它们需要的,你要将hbase-site.xml放入$HADOOP_HOME/conf目录,并将HBase jar放入$HADOOP_HOME/lib目录,每个集群都要如此配置。或者你可以编辑$HADOOP_HOME/conf/hadoop-env.sh,并把它加入到HADOOP_CLASSPATH 变量中,但并不推荐你这种方法,因为这会影响你根据Hbase referenc的Hadoop安装。这也需要你重启hadoop集群以使Hadoop能够使用Hbase data。

建议的方法是使HBase自己增加他的依赖jars并使用HADOOP_CLASSPATH or -libjars

从0.90.x起,HBase可自动将依赖jars加入到job配置中,只需保证相关jars存在于本地的CLASSPATH。下面的事例运行了绑定的HBase RowCounter MR工作,其中表名叫usertable。如果你没有设置命令中(该命令以$为前缀,并被花括号包围)预期的环境变量,可以使用真实系统路径替代。确保为你的系统使用正确的HBase jar。单引号会使shell执行子命令setting the output of hbase classpath (the command to dump HBase CLASSPATH) to HADOOP_CLASSPATH.This example assumes you use a BASH-compatible shell.

  1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath` ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/lib/hbase-server-VERSION.jar rowcounter usertable

当这个命令运行时,内部地, HBase jar会去找它需要的依赖并将它们加入到MR工作配置中。See the source at TableMapReduceUtil#addDependencyJars(org.apache.hadoop.mapreduce.Job) for how this is done.

命令 hbase mapredcp 能帮你转储MR需要的CLASSPATH条目,同样jars TableMapReduceUtil#addDependencyJars会加入。你可以将它们一起放入HBase配置目录 HADOOP_CLASSPATH 中。那些不打包依赖或调用TableMapReduceUtil#addDependencyJars的工作,下面的命令结构式必须的:

  1. $ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`:${HBASE_HOME}/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(${HBASE_HOME}/bin/hbase mapredcp | tr ':' ',') ...

这个案例可能会出错如果你在HBase的build 目录而不是安装目录运行它。可能会有如下错误: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.mapreduce.RowCounter$RowCounterMapper 如果这个发生了,试着将命令改成如下,使它可以使用build环境中target/directory下的HBase jars $ HADOOP_CLASSPATH=${HBASE_BUILD_HOME}/hbase-server/target/hbase-server-VERSION-SNAPSHOT.jar:${HBASE_BUILD_HOME}/bin/hbase classpath${HADOOP_HOME}/bin/hadoop jar ${HBASE_BUILD_HOME}/hbase-server/target/hbase-server-VERSION-SNAPSHOT.jar rowcounter usertable

0.96.1至0.98.4的Hbase MapReduce使用者请注意 某些使用Hbase的MR工作可能无法启动。The symptom is an exception similar to the following:

Exception in thread “main” java.lang.IllegalAccessError: class com.google.protobuf.ZeroCopyLiteralByteString cannot access its superclass com.google.protobuf.LiteralByteString at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:792) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:449) at java.net.URLClassLoader.access$100(URLClassLoader.java:71) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toScan(ProtobufUtil.java:818) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.convertScanToString(TableMapReduceUtil.java:433) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:186) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:147) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:270) at org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(TableMapReduceUtil.java:100) …

这是由HBASE-9867版本引入的优化引起的,即引进了类加载依赖。

This affects both jobs using the -libjars option and “fat jar,” those which package their runtime dependencies in a nested lib folder.

要满足新类加载器的要求,hadoop的类路径必须包含hbase-protocol.jar。See 46.HBase, MapReduce, and the CLASSPATH for current recommendations for resolving classpath errors.

这个可以在系统范围内被解决通过将hbase-protocol.jar的索引添加到Hadoop的lib目录,通过符号链接或者将jar直接拷贝到目录中。

也可以每次job启动时通过将它包含进HADOOP_CLASSPATH环境变量中解决。当启动job时要打包依赖,下面所有三个job启动命令都可以:

  1. $ HADOOP_CLASSPATH=/path/to/hbase-protocol.jar:/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
  2. $ HADOOP_CLASSPATH=$(hbase mapredcp):/path/to/hbase/conf hadoop jar MyJob.jar MyJobMainClass
  3. $ HADOOP_CLASSPATH=$(hbase classpath) hadoop jar MyJob.jar MyJobMainClass

For jars that do not package their dependencies, the following command structure is necessary:

  1. $ HADOOP_CLASSPATH=$(hbase mapredcp):/etc/hbase/conf hadoop jar MyApp.jar MyJobMainClass -libjars $(hbase mapredcp | tr ':' ',') ...

47.MapReduce Scan Caching MR扫描缓存

TableMapReduceUtil恢复了设置对传入扫描对象的缓存选项(返回客户端结果之前可以缓存的行数量),这个功能曾因为0.95版中的bug而被放弃,这个bug已经在0.98.5和0.96.3中解决了选择扫描缓存的优先级顺序如下:

  1. 扫描对象的缓存设置。
  2. 配置选项 hbase.client.scanner.caching中指定的缓存设置,可以在hbase-site.xml中手动设置也可以通过辅助方法TableMapReduceUtil.setScannerCaching()设置。
  3. HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING的默认值,100.

对缓存设置的优化是对客户端请求结果的等待时间和客户端需要获得结果数量之间的一个平衡。如果设置太大,客户端可能会因超时而结束。如果设置太小,扫描需要返回多块结果。如果把扫描想成一把铲子,大的缓存相当于一个大铲子,小铲子意味着为了填满bucket需要铲更多次。

上面提到的优先级列表允许您设置一个合理的默认值,并为特定的操作重写它。

48. Bundled HBase MapReduce Jobs

The HBase JAR also serves as a Driver for some bundled MapReduce jobs. To learn about the bundled MapReduce jobs, run the following command.

  1. $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar
  2. An example program must be given as the first argument.
  3. Valid program names are:
  4. copytable: Export a table from local cluster to peer cluster
  5. completebulkload: Complete a bulk data load.
  6. export: Write table data to HDFS.
  7. import: Import data written by Export.
  8. importtsv: Import data in TSV format.
  9. rowcounter: Count rows in HBase table

Each of the valid program names are bundled MapReduce jobs. To run one of the jobs, model your command after the following example.

  1. $ ${HADOOP_HOME}/bin/hadoop jar ${HBASE_HOME}/hbase-server-VERSION.jar rowcounter myTable

49.HBase作为一个MapReduce作业的数据源和数据接收器

HBase可以被用作mapreduce的数据源,TableInputFormat 和数据接收器,TableOutputFormat或MultiTableOutputFormat。写Mr作业,读或写HBase,最好是子类TableMapper and/or tableReducer。See the do-nothing pass-through classes IdentityTableMapper and IdentityTableReducer for basic usage. For a more involved example, see RowCounter or review the org.apache.hadoop.hbase.mapreduce.TestTableMapReduce unit test.

如果运行MR job时使用HBase作为数据源或数据接收器,需要在配置中指定源和结果表及列名。

当你从HBase中读时, TableInputFormat从HBase中请求regions表并制成map,或者是map-per-region或是mapreduce.job.maps map,无论哪个是更小的。如果job只有两个map,提高mapreduce.job.maps使其大于regions数量。如果你在每个regionServer节点上都运行了TaskTracker/NodeManager,Maps将运行在相邻的TaskTracker/NodeManager上。当向HBase写入时,可以避免Reduce步骤并从map中写回HBase。当任务不需要分类和整理,这种map发出数据的方法会管用。插入时, Hbase ‘sorts’中没有double-sorting(以及对你的MR集群shuffling data),除非你需要这个。如果不需要Reduce,map可能会发出处理的记录数来报告任务结束,或将Reduce数设为0,并使用TableOutputFormat。如果在你的案例中,有必要启动Reduce,通常你应该使用多个reducers,这样负载可以分布到整个HBase集群中。

HRegionPartitioner,HBase新的分割,可启动与regions同样数量的reducers.如果你的表格很大而上传不会大大改变现有region的数量,HRegionPartitioner很合适。 否则使用默认的分割方法。

50. 批量导入时直接写入HFiles

如果你在导入一张新的表格,可以忽略HBase API直接将内容写入文件系统,格式化成HBase数据文件(HFiles.)导入将运行得更快,也许是一个数量级的更快。

51. RowCounter Example

包括RowCounter的MR job使用TableInputFormat并记录指定表中行的数量。 To run it, use the following command:

  1. $ ./bin/hadoop jar hbase-X.X.X.jar

这将引用HBase MapReduce Driver类。在提供的选择中选择rowcounter。会将rowcounter使用建议打印到标准输出中。指定表名、计数的列以及输出目录。

52.map任务分解

52.1. 默认的HBase MapReduce分离器

在MapReduce job中使用TableInputFormat从HBase表中请求数据时,分离器将为表的每个region生成一个map任务。因此, 如果表中有100个regions这有100个map任务,不管扫描操作中选择了多少列族。

52.2. 自定义分离器

For those interested in implementing custom splitters, see the method getSplits in TableInputFormatBase. That is where the logic for map-task assignment resides.

53. HBase MapReduce Examples

53.1. HBase MapReduce Read Example

下面是以只读方式使用HBase作为MR数据源的例子。明确地,这里只有Mapper实例而没有Reducer,不会从Mapper中发出任何数据。

  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config, "ExampleRead");
  3. job.setJarByClass(MyReadJob.class); // class that contains mapper
  4. Scan scan = new Scan();
  5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  6. scan.setCacheBlocks(false); // don't set to true for MR jobs
  7. // set other scan attrs
  8. ...
  9. TableMapReduceUtil.initTableMapperJob(
  10. tableName, // input HBase table name
  11. scan, // Scan instance to control CF and attribute selection
  12. MyMapper.class, // mapper
  13. null, // mapper output key
  14. null, // mapper output value
  15. job);
  16. job.setOutputFormatClass(NullOutputFormat.class); // because we aren't emitting anything from mapper
  17. boolean b = job.waitForCompletion(true);
  18. if (!b) {
  19. throw new IOException("error with job!");
  20. }

…mapper实例将extend TableMapper…

  1. public static class MyMapper extends TableMapper<Text, Text> {
  2. public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException {
  3. //process data for the row from the Result instance.
  4. }
  5. }

53.2. HBase MapReduce Read/Write Example

下面是个使用HBase即作为MR的数据源又作为数据接收端。这个例子只是将数据从一张表拷贝到另一张表。

  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config, "ExampleReadWrite");
  3. job.setJarByClass(MyReadWriteJob.class); //包含 mapper的类
  4. Scan scan = new Scan();
  5. scan.setCashing(500); //Scan中默认为1,对MR job来说不太好
  6. scan.setCasheBlocks(false); //对MR JOB不要设置为true
  7. //设置其他scan属性
  8. TableMapReduceUtil.intTableMapperJob(
  9. sourceTable, //input table
  10. scan, //Scan instance to control CF and attribute selection
  11. MyMapper.class, //mapper class
  12. null, //mapper output key
  13. null, //mapper output value
  14. job);
  15. TableMapReduceUtil.initTableReducerJob(
  16. targetTable, //output table
  17. null, //reducer class
  18. job);
  19. job.setNumReduceTasks(0);
  20. boolean b = job.waitForCompletion(true);
  21. if(!b){
  22. throw new IOException("error with job!");
  23. }

这里要解释下TableMapReduceUtil的工作内容,特别是对于reducer.TableOutputFormat,被用作outputFormat类,许多参数被在config上设置(如,TableOutputFormat.OUTPUT_TABLE ),同样设置reducer输出值为ImmutableBytesWritable ,reducer值为Writable.这些可以被programmer设置在job和conf上,但是TableMapReduceUtil试着使事情简单。

下面的mapper例子,将创建一个put操作,并匹配输入结果并发送它。注意这是CopyTable utility所完成的。

  1. public static class MyMapper extends TableMapper<ImmutableBytesWritable, Put> {
  2. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
  3. // this example is just copying the data from the source table...
  4. context.write(row, resultToPut(row,value));
  5. }
  6. private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException {
  7. Put put = new Put(key.get());
  8. for (KeyValue kv : result.raw()) {
  9. put.add(kv);
  10. }
  11. return put;
  12. }
  13. }

事实上这里已经不存在reducer步骤,所以TableOutTable负责向目标表发送Put操作。

这仅仅是个例子,开发者可以选择不使用TableOutputFormat而自己连接目标表格。

53.3. HBase MapReduce Read/Write Example With Multi-Table Output

TODO: example for MultiTableOutputFormat.

53.4. HBase MapReduce Summary to HBase Example

下面的例子是使用HBase作为MR的数据源和数据接收端来完成一个总结步骤,这个例子将统计表中某个值的不同实例的数目。并将那些计数放入其他表中。​

  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config, "ExampleReadWrite");
  3. job.setJarByClass(MyReadWriteJob.class); //包含 mapper的类
  4. Scan scan = new Scan();
  5. scan.setCashing(500); //Scan中默认为1,对MR job来说不太好
  6. scan.setCasheBlocks(false); //对MR JOB不要设置为true
  7. //设置其他scan属性
  8. TableMapReduceUtil.initTableMapperJob(
  9. sourceTable, // input table
  10. scan, // Scan instance to control CF and attribute selection
  11. MyMapper.class, // mapper class
  12. Text.class, // mapper output key
  13. IntWritable.class, // mapper output value
  14. job);
  15. TableMapReduceUtil.initTableReducerJob(
  16. targetTable, // output table
  17. MyTableReducer.class, // reducer class
  18. job);
  19. job.setNumReduceTasks(1); // at least one, adjust as required
  20. boolean b = job.waitForCompletion(true);
  21. if(!b){
  22. throw new IOException("error with job");
  23. }

在这个例子中将字符串值映射为一列被选为总结值。This value is used as the key to emit from the mapper,and an IntWritable represents an instance counter.

  1. public static class MyMapper extends TableMapper<Text, IntWritable> {
  2. public static final byte[] CF = "cf".getBytes();
  3. public static final byte[] ATTR1 = "attr1".getBytes();
  4. private final IntWritable ONE = new IntWritable(1);
  5. private Text text = new Text();
  6. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
  7. String val = new String(value.getValue(CF, ATTR1));
  8. text.set(val); // we can only emit Writables...
  9. context.write(text, ONE);
  10. }
  11. }

在Reducer程序中, “ones”被累计(就像其他MR例子中那样),然后发出一个Put。

  1. public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
  2. public static final byte[] CF = "cf".getBytes();
  3. public static final byte[] COUNT = "count".getBytes();
  4. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  5. int i = 0;
  6. for (IntWritable val : values) {
  7. i += val.get();
  8. }
  9. Put put = new Put(Bytes.toBytes(key.toString()));
  10. put.add(CF, COUNT, Bytes.toBytes(i));
  11. context.write(null, put);
  12. }
  13. }

53.5 HBase MapReduce Summary to File Example

这个和上面总结的例子很像,不同的是这里使用Hbase作为MR数据源,而作为HDFS作为接收端。不同之处在任务设定和reducer中,mapper还是一样。

  1. Configuration config = HBaseConfiguration.create();
  2. Job job = new Job(config,"ExampleSummaryToFile");
  3. job.setJarByClass(MySummaryFileJob.class); // class that contains mapper and reducer
  4. Scan scan = new Scan();
  5. scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
  6. scan.setCacheBlocks(false); // don't set to true for MR jobs
  7. // set other scan attrs
  8. TableMapReduceUtil.initTableMapperJob(
  9. sourceTable, // input table
  10. scan, // Scan instance to control CF and attribute selection
  11. MyMapper.class, // mapper class
  12. Text.class, // mapper output key
  13. IntWritable.class, // mapper output value
  14. job);
  15. job.setReducerClass(MyReducer.class); // reducer class
  16. job.setNumReduceTasks(1); // at least one, adjust as required
  17. FileOutputFormat.setOutputPath(job, new Path("/tmp/mr/mySummaryFile")); // adjust directories as required
  18. boolean b = job.waitForCompletion(true);
  19. if (!b) {
  20. throw new IOException("error with job!");
  21. }

如上所述, 这个例子的Mapper和之前的一样。对于Reducer,使用”generic“代替继承TableMapper并发出puts

  1. public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  2. public void reduce(Text key, Iteralbe<IntWritable> values, Context context) throw IOException, InterruptedException{
  3. int i = 0;
  4. for (IntWritable val: values){
  5. i += val.get();
  6. }
  7. context.write(key, new IntWritable(i));
  8. }
  9. }

53.6 HBase MapReduce Summary to HBase Without Reducer

没有reducer也可以进行总结,如果使用Hbase作为reducer。

一个HBase目标表有必要为job总结而存在。Table方法incrementColumnValue将使用自增值。从性能角度,每个map任务保持值的Map与它的值一起增加很有意义,当mapper的cleanup方法时,每个key更新一次。然而, 你的mileage可能会有所不同,这取决于要处理的行数和唯一键。

最后,总结结果在Hbase中。

53.7 HBase MapReduce Summary to RDBMS

有时,产生总结放入RDBMS中更合适。对这些例子来说,自定义reducer产生总结直接到RDBMS中是可能的。setup方法可以连接到RDBMS上,cleanup方法可以断开这个连接。

理解job的reducer数量将影响总结的实现,你必须将这个设计到你的reducer中。特别地,不管是以singleton来运行还是multiple来运行reducer。正确或错误,根据用例。越多的reducer被布置到job中,越多的RDBMS同步连接会被建立,this will scale, but only to a point.(?)

  1. public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  2. private Connection c = null;
  3. public void setup(Context context) {
  4. //创建 DB连接。。。
  5. }
  6. public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  7. //do summarisation
  8. // 本例中Text是键, 但这只是个例子
  9. }
  10. public void cleanup(Context context){
  11. //关闭db连接
  12. }
  13. }

In the end, the summary results are written to your RDBMS table/s.

54. 在一个MR 任务中连接其他 Hbase 表

尽管当前框架内允许一个HBase表作为MR任务的输入,其他HBase表可作为查找表访问,在一个MR job中在Mapper的setup方法中创建一个表的实例。

  1. public class MyMapper extends TableMapper<Text,LongWritable> {
  2. public Talbe myOtherTable;
  3. public void setup(Context context) {
  4. //这里创建连接到集群上并保存下来或使用已存在表的连接
  5. myOtherTable = connection.getTable("myOtherTable");
  6. }
  7. public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException,InterruptedException {
  8. //处理结果
  9. //使用 ‘myOtherTable’查询
  10. }

55. 预测执行

通常建议使用HBase作为数据源的MR jobs关闭预测执行。这个可以通过配置在每个job基础上实现, 或者集群上实现。特别是对那些长时间运行的工作,预测执行将创建重复的map任务,这将你的数据写两遍到HBase中,这可能不是你想要的。

See spec.ex for more information.

56.Cascading 级联

级联是MR的可替换API,事实上还是使用MapReduce,但允许你以一种简单的方法写MapReduce代码。

下面是Cascading Flow的例子,将数据”汇入“Hbase集群。同样的hBaseTap API也可以被用来”源“数据。

  1. //从默认文件系统中读取数据
  2. //发出两个字段,:“offset偏移”和“line行”
  3. Tap source = new Hfs(new TextLine(), inputFileLhs);
  4. //在一个HBase集群中存储数据
  5. //接收字段:"num","lower"和"upper"
  6. //will automatically scope incoming fields to their proper familyname, "left" or "right"
  7. Fields keyFields = new Fields("num");
  8. String[] familyNames = {"left", "right"};
  9. Fields[] valueFields = new Fields[] {new Fields( "lower" ), new Fields( "upper" )};
  10. Tap hBaseTap = new HBaseTap( "multitable", new HBaseScheme( keyFields, familyNames, valueFields ), SinkMode.REPLACE );
  11. //分配一个简单的管道将输入解析成字段
  12. //现实应用可能会链接多个管道一起进行更复杂的处理
  13. Pipe parsePipe = new Each("insert", new Fields("line"), new RegexSplitter(new Fields("num", "lower","upper"), " "));
  14. //"plan"一个集群执行流
  15. //这将the source Tap and hBaseTap和parsePipe连接起来
  16. Flow parseFlow = new FlowConnector(properties).connect(source, hBaseTap, parsePipe);
  17. //start the flow, and block until complete
  18. parseFlow.complete();
  19. //打开Hbase表的迭代器来填充数据
  20. TupleEntryIterator iterator = parseFlow.openSink();
  21. while(iterator.hasNext())
  22. {
  23. //将每个tuple从HBase中打印出来
  24. System.out.println("iterator.next()="+ iterator.next());
  25. }
  26. iterator.close();