HBase and Spark

Apache Spark是一个用于分布式处理数据的软件框架,在许多情况下被用来替换MapReduce使用。

spark本身非本文讨论范围,请参考Spark site来获取更多关于Spark的项目。这里主要讨论Hbase和Spark4个方面的交互。

Basic Spark

在Spark DAG 中任意点连接HBase的能力。

Spark Streaming

在Spark streaming应用中连接HBase的能力。

Spark Bulk Load

直接写入HBase HFiles的能力,从而实现批量插入HBase。

SparkSQL/DataFrames

写SparkSQL的能力,即在HBase中绘制表格的能力

83. Basic Spark

本节讨论Spark、HBase最低层次上的整合。所有其他交互都是建立在这里将描述的概念之上。

HBaseContext是所有Spark和HBase整合的基础。HbaseContext获取HBase配置并将它们放入Spark executors中。这使每个Spark Executor在静态位置上都有一个HBase连接。

例如,Spark Executor可以和Region Server在同一节点上或在不同节点上,这里没有共同位置的依赖。每个Spark Excutor都是一个多线程任务的客户端应用。这使任何运行在executor上的Spark任务连接共享连接对象。

Example 46. HBaseContext Usage Example

This example shows how HBaseContext can be used to do a foreachPartition on a RDD in Scala:

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. val hbaseContext = new HBaseContext(sc, config)
  4. rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
  5. val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
  6. it.foreach((putRecord) => {
  7. . val put = new Put(putRecord._1)
  8. . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
  9. . bufferedMutator.mutate(put)
  10. })
  11. bufferedMutator.flush()
  12. bufferedMutator.close()
  13. })

Here is the same example implemented in Java:

  1. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  2. try {
  3. List<byte[]> list = new ArrayList<>();
  4. list.add(Bytes.toBytes("1"));
  5. list.add(Bytes.toBytes("5"));
  6. JavaRDD<byte[]> rdd = jsc.parallelize(list);
  7. Configuration conf = HBaseConfiguration.create();
  8. JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
  9. hbaseContext.foreachPartition(rdd,
  10. new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
  11. public void call(Tuple2<Iterator<byte[]>, Connection> t)
  12. throws Exception {
  13. Table table = t._2().getTable(TableName.valueOf(tableName));
  14. BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
  15. while (t._1().hasNext()) {
  16. byte[] b = t._1().next();
  17. Result r = table.get(new Get(b));
  18. if (r.getExists()) {
  19. mutator.mutate(new Put(b));
  20. }
  21. }
  22. mutator.flush();
  23. mutator.close();
  24. table.close();
  25. }
  26. });
  27. } finally {
  28. jsc.stop();
  29. }

All functionality between Spark and HBase will be supported both in Scala and in Java, with the exception of SparkSQL which will support any language that is supported by Spark. For the remaining of this documentation we will focus on Scala examples for now.

The examples above illustrate how to do a foreachPartition with a connection. A number of other Spark base functions are supported out of the box:

bulkPut

大规模并发发送puts到HBase中

bulkDelete

大规模并发送deletes到HBase中

bulkGet

大规模并发发送gets到HBase来创建新的的RDD

mapPartition

执行Connection对象的Spark Map函数来允许全连接至HBase

hBaseRDD

简化分布式scan来创建一个RDD

84. Spark Streaming

Spark streaming是建立在Spark顶端的微型批量流处理框架。Hbase和Spark streaming可以很好的结合,Hbase可为HBase streaming提供一下帮助

  • 一个用来抓取参考数据或概要数据的地方
  • A place to store counts or aggregates in a way that supports Spark Streaming promise of only once processing.

HBase-Spark结合的Spark-streaming方法与普通的Spark结合方法类似,in that the following commands are possible straight off a Spark Streaming DStream.

bulkPut

大规模并发发送puts到HBase中

bulkDelete

大规模并发送deletes到HBase中

bulkGet

大规模并发发送gets到HBase来创建新的的RDD

mapPartition

执行Connection对象的Spark Map函数来允许全连接至HBase

hBaseRDD

简化分布式scan来创建一个RDD

Example 47. bulkPut Example with DStreams

Below is an example of bulkPut with DStreams. It is very close in feel to the RDD bulk put.

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. val hbaseContext = new HBaseContext(sc, config)
  4. val ssc = new StreamingContext(sc, Milliseconds(200))
  5. val rdd1 =
  6. val rdd2 =
  7. val queue = mutable.QueueRDD[(Array[Byte], Array[(Array[Byte],
  8. Array[Byte], Array[Byte])])]
  9. queue += rdd1
  10. queue += rdd2
  11. val dStream = ssc.queueStream(queue)
  12. dStream.hbaseBulkPut(
  13. hbaseContext,
  14. TableName.valueOf(tableName),
  15. (putRecord) => {
  16. val put = new Put(putRecord._1)
  17. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
  18. put
  19. })

hbaseBulkPut 函数有三个输入。hbaseContext,持有配置信息在executor中连接到HBase connections。 输入数据的表名。一个将DStream中的记录转为HBase Put对象的函数

85. Bulk Load

有两种方法使用Spark将数据批量加载到HBase中。基础的批量加载功能适用于行具有许多列的情况以及列不固定的情况。

还有一个Spark选项来记录批量加载的。这种情况是为表格每行少于10000列的情况。第二种方法的优势是在Spark shuffle操作时更多的吞吐和更少的负载。

实现工作或多或少类似于MapReduce批量加载流程,分区工具在区域分割的基础上分割行键,并按序将行键送入reducer中。