HBase 与 Spark

贡献者:TsingJyujing

Apache Spark 是一个分布式的、用于在内存中处理数据的软件框架,在许多场景中用于代替 MapReduce。

Spark 本身已经超出了本文档的范围,请参考 Spark 的项目及子项目的网站来获取更多信息。本文档将会集中在 4 个主要的 HBase 和 Spark 交互的要点上,这四点分别是:

基础 Spark

这可以在 Spark DAG 中的任意一点使用 HBase Connection。

Spark Streaming

这可以在 Spark Streaming 应用中的任意一点使用 HBase Connection。

Spark 批量加载

这可以允许在批量插入 HBase 的时候直接写 HBase 的 HFiles。

SparkSQL/DataFrames

这将提供为 HBase 中定义的表提供写 SparkSQL 的能力。

下面的部分将会用几个例子来说明上面几点交互。

104. 基础 Spark

这一部分将会在最底层和最简单的等级讨论 HBase 与 Spark 的整合。其他交互的要点都是基于这些操作构建的,我们会在这里完整描述。

一切 HBase 和 Spark 整合的基础都是 HBaseContext,HBaseContext 接受 HBase 配置并且会将其推送到 Spark 执行器(executor)中。这允许我们在每个 Spark 执行器(executor)中有一个静态的 HBase 连接。

作为参考,Spark 执行器(executor)既可以和 Region Server 在同一个节点,也可以在不同的节点,他们不存在共存的依赖关系。

可以认为每个 Spark 执行器(executor)都是一个多线程的客户端程序,这允许运行在不同的执行器上的 Spark 任务访问共享的连接对象。

例 31. HBaseContext 使用例程

这个例子展现了如何使用 Scala 语言在 RDD 的foreachPartition方法中使用 HBaseContext。

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. ...
  4. val hbaseContext = new HBaseContext(sc, config)
  5. rdd.hbaseForeachPartition(hbaseContext, (it, conn) => {
  6. val bufferedMutator = conn.getBufferedMutator(TableName.valueOf("t1"))
  7. it.foreach((putRecord) => {
  8. . val put = new Put(putRecord._1)
  9. . putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
  10. . bufferedMutator.mutate(put)
  11. })
  12. bufferedMutator.flush()
  13. bufferedMutator.close()
  14. })

这里是使用 Java 编写的同样的例子。

  1. JavaSparkContext jsc = new JavaSparkContext(sparkConf);
  2. try {
  3. List<byte[]> list = new ArrayList<>();
  4. list.add(Bytes.toBytes("1"));
  5. ...
  6. list.add(Bytes.toBytes("5"));
  7. JavaRDD<byte[]> rdd = jsc.parallelize(list);
  8. Configuration conf = HBaseConfiguration.create();
  9. JavaHBaseContext hbaseContext = new JavaHBaseContext(jsc, conf);
  10. hbaseContext.foreachPartition(rdd,
  11. new VoidFunction<Tuple2<Iterator<byte[]>, Connection>>() {
  12. public void call(Tuple2<Iterator<byte[]>, Connection> t)
  13. throws Exception {
  14. Table table = t._2().getTable(TableName.valueOf(tableName));
  15. BufferedMutator mutator = t._2().getBufferedMutator(TableName.valueOf(tableName));
  16. while (t._1().hasNext()) {
  17. byte[] b = t._1().next();
  18. Result r = table.get(new Get(b));
  19. if (r.getExists()) {
  20. mutator.mutate(new Put(b));
  21. }
  22. }
  23. mutator.flush();
  24. mutator.close();
  25. table.close();
  26. }
  27. });
  28. } finally {
  29. jsc.stop();
  30. }

所有的函数式都同时在 Spark 和 HBase 中,并且都支持用 Scala 或者 Java 开发。除了 SparkSQL 以外,所有 Spark 支持的语言在这里也都支持。
目前在余下的文档中,我们将会重点关注 Scala 的例程。

上面的例程阐释了如何在 foreachPartition 操作中使用连接。除此之外,许多 Spark 的基础函数都是支持的:

bulkPut

并行的写入大量数据到 HBase

bulkDelete

并行的删除 HBase 中大量数据

bulkGet

并行的从 HBase 中获取大量的数据,并且创建一个新的 RDD

mapPartition

在 Spark 的 Map 函数中使用连接对象,并且允许使用完整的 HBase 访问

hBaseRDD

简单的创建一个用于分布式扫描数据的 RDD

想要参看所有机能的例程,参见 HBase-Spark 模块。

105. Spark Streaming

Spark Streaming 是一个基于 Spark 构建的微批流处理框架。
HBase 和 Spark Streaming 的良好配合使得 HBase 可以提供一下益处:

  • 可以动态的获取参考或者描述性数据

  • 基于 Spark Streaming 提供的恰好一次处理,可以存储计数或者聚合结果

HBase-Spark 模块整合的和 Spark Streaming 的相关的点与 Spark 整合的点非常相似,
以下的指令可以在 Spark Streaming DStream 中立刻使用:

bulkPut

并行的写入大量数据到 HBase

bulkDelete

并行的删除 HBase 中大量数据

bulkGet

并行的从 HBase 中获取大量的数据,并且创建一个新的 RDD

mapPartition

在 Spark 的 Map 函数中使用连接对象,并且允许使用完整的 HBase 访问

hBaseRDD

简单的创建一个用于分布式扫描数据的 RDD。

例 32. bulkPut在 DStreams 中使用的例程

以下是 bulkPut 在 DStreams 中的使用例程,感觉上与 RDD 批量插入非常接近。

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. val hbaseContext = new HBaseContext(sc, config)
  4. val ssc = new StreamingContext(sc, Milliseconds(200))
  5. val rdd1 = ...
  6. val rdd2 = ...
  7. val queue = mutable.Queue[RDD[(Array[Byte], Array[(Array[Byte],
  8. Array[Byte], Array[Byte])])]]()
  9. queue += rdd1
  10. queue += rdd2
  11. val dStream = ssc.queueStream(queue)
  12. dStream.hbaseBulkPut(
  13. hbaseContext,
  14. TableName.valueOf(tableName),
  15. (putRecord) => {
  16. val put = new Put(putRecord._1)
  17. putRecord._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
  18. put
  19. })

这里到hbaseBulkPut函数有三个输入,hbaseContext 携带了配置广播信息,来帮助我们连接到执行器中的 HBase Connections。
表名用于指明我们要往哪个表放数据。一个函数将 DStream 中的记录转换为 HBase Put 对象。

106. 批量加载

使用 Spark 加载大量的数据到 HBase 有两个选项。
基本的大量数据加载功能适用于你的行有数百万列数据,以及在 Spark 批量加载之前的 Map 操作列没有合并和分组的情况。

Spark 中还有一个轻量批量加载选项,这个第二选项设计给每一行少于一万的情况。
第二个选项的优势在于更高的吞吐量,以及 Spark 的 shuffle 操作中更轻的负载。

两种实现都或多或少的类似 MapReduce 批量加载过程,
因为分区器基于 Region 划分对行键进行分区。并且行键被顺序的发送到 Reducer
所以 HFile 可以在 reduce 阶段被直接写出。

依照 Spark 的术语来说,批量加载将会基于repartitionAndSortWithinPartitions实现,并且之后是 Spark 的foreachPartition

让我们首先看一下使用批量加载功能的例子

例 33. 批量加载例程

下面的例子展现了 Spark 中的批量加载。

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. val hbaseContext = new HBaseContext(sc, config)
  4. val stagingFolder = ...
  5. val rdd = sc.parallelize(Array(
  6. (Bytes.toBytes("1"),
  7. (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
  8. (Bytes.toBytes("3"),
  9. (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
  10. rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  11. t => {
  12. val rowKey = t._1
  13. val family:Array[Byte] = t._2(0)._1
  14. val qualifier = t._2(0)._2
  15. val value = t._2(0)._3
  16. val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
  17. Seq((keyFamilyQualifier, value)).iterator
  18. },
  19. stagingFolder.getPath)
  20. val load = new LoadIncrementalHFiles(config)
  21. load.doBulkLoad(new Path(stagingFolder.getPath),
  22. conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

hbaseBulkLoad 函数需要三个必要参数:

  1. 我们需要从之加载数据的表名

  2. 一个函数用于将 RDD 中的某个记录转化为一个元组形式的键值对。
    其中键值是一个 KeyFamilyQualifer 对象,值是 cell value。
    KeyFamilyQualifer 将会保存行键,列族和列标识位。
    针对行键的随机操作会根据这三个值来排序。

  3. 写出 HFile 的临时路径

接下来的 Spark 批量加载指令,使用 HBase 的 LoadIncrementalHFiles 对象来加载 HBase 中新创建的 HFiles。

使用 Spark 批量加载的附加参数

你可以在 hbaseBulkLoad 中用附加参数设置以下属性:

  • HFile 的最大文件大小
  • 从压缩中排除 HFile 的标志
  • 列族设置,包含 compression(压缩), bloomType(布隆(过滤器)类型), blockSize(块大小), and dataBlockEncoding(数据块编码)

例 34. 使用附加参数

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. val hbaseContext = new HBaseContext(sc, config)
  4. val stagingFolder = ...
  5. val rdd = sc.parallelize(Array(
  6. (Bytes.toBytes("1"),
  7. (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
  8. (Bytes.toBytes("3"),
  9. (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
  10. val familyHBaseWriterOptions = new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions]
  11. val f1Options = new FamilyHFileWriteOptions("GZ", "ROW", 128, "PREFIX")
  12. familyHBaseWriterOptions.put(Bytes.toBytes("columnFamily1"), f1Options)
  13. rdd.hbaseBulkLoad(TableName.valueOf(tableName),
  14. t => {
  15. val rowKey = t._1
  16. val family:Array[Byte] = t._2(0)._1
  17. val qualifier = t._2(0)._2
  18. val value = t._2(0)._3
  19. val keyFamilyQualifier= new KeyFamilyQualifier(rowKey, family, qualifier)
  20. Seq((keyFamilyQualifier, value)).iterator
  21. },
  22. stagingFolder.getPath,
  23. familyHBaseWriterOptions,
  24. compactionExclude = false,
  25. HConstants.DEFAULT_MAX_FILE_SIZE)
  26. val load = new LoadIncrementalHFiles(config)
  27. load.doBulkLoad(new Path(stagingFolder.getPath),
  28. conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

现在让我们来看一下如何调用轻量化对象批量加载的实现:

例 35. 使用轻量批量加载

  1. val sc = new SparkContext("local", "test")
  2. val config = new HBaseConfiguration()
  3. val hbaseContext = new HBaseContext(sc, config)
  4. val stagingFolder = ...
  5. val rdd = sc.parallelize(Array(
  6. ("1",
  7. (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))),
  8. ("3",
  9. (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), ...
  10. rdd.hbaseBulkLoadThinRows(hbaseContext,
  11. TableName.valueOf(tableName),
  12. t => {
  13. val rowKey = t._1
  14. val familyQualifiersValues = new FamiliesQualifiersValues
  15. t._2.foreach(f => {
  16. val family:Array[Byte] = f._1
  17. val qualifier = f._2
  18. val value:Array[Byte] = f._3
  19. familyQualifiersValues +=(family, qualifier, value)
  20. })
  21. (new ByteArrayWrapper(Bytes.toBytes(rowKey)), familyQualifiersValues)
  22. },
  23. stagingFolder.getPath,
  24. new java.util.HashMap[Array[Byte], FamilyHFileWriteOptions],
  25. compactionExclude = false,
  26. 20)
  27. val load = new LoadIncrementalHFiles(config)
  28. load.doBulkLoad(new Path(stagingFolder.getPath),
  29. conn.getAdmin, table, conn.getRegionLocator(TableName.valueOf(tableName)))

注意在使用轻量行批量加载的时候函数返回的元组中:
第一个元素是行键,第二个元素是 FamiliesQualifiersValues,这个对象中含有这一行里所有的数值,并且包含了所有的列族。

107. SparkSQL/DataFrames

(HBase-Spark 中的)HBase-Spark 连接器 提供了:
DataSource API
(SPARK-3247)
在 Spark 1.2.0 的时候被引入,连接了简单的 HBase 的键值存储与复杂的关系型 SQL 查询,并且使得用户可以使用 Spark 在 HBase 上施展复杂的数据分析工作。
HBase Dataframe 是 Spark Dataframe 的一个标准,并且它允许和其他任何数据源——例如 Hive, Orc, Parquet, JSON 之类。
HBase-Spark Connector 使用的关键技术例如分区修剪,列修剪,推断后置以及数据本地化。

为了使用 HBase-Spark connector,用户需要定义 HBase 到 Spark 表中的映射目录。
准备数据并且填充 HBase 的表,然后将其加载到 HBase DataFrame 中去。
在此之后,用户可以使用 SQL 查询语句整合查询与数据获取。
接下来的例子说明了最基本的过程

107.1. 定义目录

  1. def catalog = s"""{
  2. |"table":{"namespace":"default", "name":"table1"},
  3. |"rowkey":"key",
  4. |"columns":{
  5. |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
  6. |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
  7. |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
  8. |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
  9. |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
  10. |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
  11. |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
  12. |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
  13. |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
  14. |}
  15. |}""".stripMargin

目录定义了从 HBase 到 Spark 表的一个映射。
这个目录中有两个关键部分。
第一个是行键的定义,另一个是将 Spark 表中的列映射到 HBase 的列族和列标识位。
上面的 schema 定义了一个 HBase 中的表,名为 Table1,行键作为键与一些列(col1 - col8)。
注意行键也需要被定义为一个列(col0),该列具有特定的列族(rowkey)。

107.2. 保存 DataFrame

  1. case class HBaseRecord(
  2. col0: String,
  3. col1: Boolean,
  4. col2: Double,
  5. col3: Float,
  6. col4: Int,
  7. col5: Long,
  8. col6: Short,
  9. col7: String,
  10. col8: Byte)
  11. object HBaseRecord
  12. {
  13. def apply(i: Int, t: String): HBaseRecord = {
  14. val s = s"""row${"%03d".format(i)}"""
  15. HBaseRecord(s,
  16. i % 2 == 0,
  17. i.toDouble,
  18. i.toFloat,
  19. i,
  20. i.toLong,
  21. i.toShort,
  22. s"String$i: $t",
  23. i.toByte)
  24. }
  25. }
  26. val data = (0 to 255).map { i => HBaseRecord(i, "extra")}
  27. sc.parallelize(data).toDF.write.options(
  28. Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
  29. .format("org.apache.hadoop.hbase.spark ")
  30. .save()

用户准备的数据(data)是一个本地的 Scala 集合,含有 256 个 HBaseRecord 对象。
sc.parallelize(data) 函数分发了从 RDD 中来的数据。toDF返回了一个 DataFrame。
write 函数返回了一个 DataFrameWriter 来将 DataFrame 中的数据到外部存储(例如这里是 HBase)。
save 函数将会创建一个具有 5 个 Region 的 HBase 表来在内部保存 DataFrame。

107.3. 加载 DataFrame

  1. def withCatalog(cat: String): DataFrame = {
  2. sqlContext
  3. .read
  4. .options(Map(HBaseTableCatalog.tableCatalog->cat))
  5. .format("org.apache.hadoop.hbase.spark")
  6. .load()
  7. }
  8. val df = withCatalog(catalog)

在 withCatalog 函数中,sqlContext 是一个 SQLContext 的变量,是一个用于与 Spark 中结构化(行与列)的数据一起工作的一个入口点。
read 返回一个 DataFrameReader,他可以用于从 DataFrame 中读取数据。option函数为输出到 DataFrameReader 的底层的数据源增加了输入选项。
以及,format函数表示了 DataFrameReader 的输入数据源的格式。 load() 函数将其加载为一个 DataFrame, 数据帧 df将由withCatalog函数返回,用于访问 HBase 表,例如 4.4 与 4.5.

107.4. Language Integrated Query

  1. val s = df.filter(($"col0" <= "row050" && $"col0" > "row040") ||
  2. $"col0" === "row005" ||
  3. $"col0" <= "row005")
  4. .select("col0", "col1", "col4")
  5. s.show

DataFrame 可以做很多操作,例如 join, sort, select, filter, orderBy 等等等等。df.filter 通过指定的 SQL 表达式提供过滤器,select选择一系列的列:col0, col1col4

107.5. SQL 查询

  1. df.registerTempTable("table1")
  2. sqlContext.sql("select count(col1) from table1").show

registerTempTable 注册了一个名为 df 的 DataFrame 作为临时表,表名为table1,临时表的生命周期和 SQLContext 有关,用于创建df
sqlContext.sql函数允许用户执行 SQL 查询。

107.6. Others

例 36. 查询不同的时间戳

在 HBaseSparkConf 中,可以设置 4 个和时间戳有关的参数,它们分别表示为 TIMESTAMP, MIN_TIMESTAMP, MAX_TIMESTAMP 和 MAX_VERSIONS。用户可以使用不同的时间戳或者利用 MIN_TIMESTAMP 和 MAX_TIMESTAMP 查询时间范围内的记录。同时,下面的例子使用了具体的数值取代了 tsSpecified 和 oldMs。

下例展示了如何使用不同的时间戳加载 df DataFrame。tsSpecified 由用户定义,HBaseTableCatalog 定义了 HBase 和 Relation 关系的 schema。writeCatalog 定义了 schema 映射的目录。

  1. val df = sqlContext.read
  2. .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMESTAMP -> tsSpecified.toString))
  3. .format("org.apache.hadoop.hbase.spark")
  4. .load()

下例展示了如何使用不同的时间范围加载 df DataFrame。oldMs 由用户定义。

  1. val df = sqlContext.read
  2. .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
  3. HBaseSparkConf.MAX_TIMESTAMP -> oldMs.toString))
  4. .format("org.apache.hadoop.hbase.spark")
  5. .load()

在加载 DataFrame 之后,用户就可以查询数据。

  1. df.registerTempTable("table")
  2. sqlContext.sql("select count(col1) from table").show

例 37. 原生 Avro 支持

Example 37. Native Avro support

HBase-Spark Connector 支持不同类型的数据格式例如 Avro, Jason 等等。下面的用例展示了 Spark 如何支持 Avro。用户可以将 Avro 记录直接持久化进 HBase。
在内部,Avro schema 自动的转换为原生的 Spark Catalyst 数据类型。
注意,HBase 表中无论是键或者值的部分都可以在 Avro 格式定义。

1) 为 schema 映射定义目录:

  1. def catalog = s"""{
  2. |"table":{"namespace":"default", "name":"Avrotable"},
  3. |"rowkey":"key",
  4. |"columns":{
  5. |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
  6. |"col1":{"cf":"cf1", "col":"col1", "type":"binary"}
  7. |}
  8. |}""".stripMargin

catalog是一个 HBase 表的 schema,命名为 Avrotable。行键作为键,并且有一个列 col1。行键也被定义为详细的一列(col0),并且指定列族(rowkey)。

2) 准备数据:

  1. object AvroHBaseRecord {
  2. val schemaString =
  3. s"""{"namespace": "example.avro",
  4. | "type": "record", "name": "User",
  5. | "fields": [
  6. | {"name": "name", "type": "string"},
  7. | {"name": "favorite_number", "type": ["int", "null"]},
  8. | {"name": "favorite_color", "type": ["string", "null"]},
  9. | {"name": "favorite_array", "type": {"type": "array", "items": "string"}},
  10. | {"name": "favorite_map", "type": {"type": "map", "values": "int"}}
  11. | ] }""".stripMargin
  12. val avroSchema: Schema = {
  13. val p = new Schema.Parser
  14. p.parse(schemaString)
  15. }
  16. def apply(i: Int): AvroHBaseRecord = {
  17. val user = new GenericData.Record(avroSchema);
  18. user.put("name", s"name${"%03d".format(i)}")
  19. user.put("favorite_number", i)
  20. user.put("favorite_color", s"color${"%03d".format(i)}")
  21. val favoriteArray = new GenericData.Array[String](2, avroSchema.getField("favorite_array").schema())
  22. favoriteArray.add(s"number${i}")
  23. favoriteArray.add(s"number${i+1}")
  24. user.put("favorite_array", favoriteArray)
  25. import collection.JavaConverters._
  26. val favoriteMap = Map[String, Int](("key1" -> i), ("key2" -> (i+1))).asJava
  27. user.put("favorite_map", favoriteMap)
  28. val avroByte = AvroSedes.serialize(user, avroSchema)
  29. AvroHBaseRecord(s"name${"%03d".format(i)}", avroByte)
  30. }
  31. }
  32. val data = (0 to 255).map { i =>
  33. AvroHBaseRecord(i)
  34. }

首先定义 schemaString,然后它被解析来获取avroSchemaavroSchema是用来生成 AvroHBaseRecord的。data 由用户准备,是一个有 256 个AvroHBaseRecord对象的原生 Scala 集合。

3) 保存 DataFrame:

  1. sc.parallelize(data).toDF.write.options(
  2. Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
  3. .format("org.apache.spark.sql.execution.datasources.hbase")
  4. .save()

对于由 schema catalog提供的已有的数据帧,上述语句将会创建一个具有 5 个分区的 HBase 表,并且将数据存进去。

4) 加载 DataFrame

  1. def avroCatalog = s"""{
  2. |"table":{"namespace":"default", "name":"avrotable"},
  3. |"rowkey":"key",
  4. |"columns":{
  5. |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
  6. |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
  7. |}
  8. |}""".stripMargin
  9. def withCatalog(cat: String): DataFrame = {
  10. sqlContext
  11. .read
  12. .options(Map("avroSchema" -> AvroHBaseRecord.schemaString, HBaseTableCatalog.tableCatalog -> avroCatalog))
  13. .format("org.apache.spark.sql.execution.datasources.hbase")
  14. .load()
  15. }
  16. val df = withCatalog(catalog)

withCatalog 函数中,read 会返回一个可以将数据读取成 DataFrame 格式的 DataFrameReader。
option 函数追加输入选项来指定 DataFrameReader 使用的底层数据源。这里有两个选项,一个是设置avroSchemaAvroHBaseRecord.schemaString,另一个是设置HBaseTableCatalog.tableCatalogavroCatalogload() 函数加载所有的数据为 DataFrame。数据帧 dfwithCatalog 函数返回,可用于访问 HBase 表中的数据。

5) SQL 查询

  1. df.registerTempTable("avrotable")
  2. val c = sqlContext.sql("select count(1) from avrotable").

在加载 df DataFrame 之后,用户可以查询数据。registerTempTable 将 df DataFrame 注册为一个临时表,表名为 avrotable。
sqlContext.sql函数允许用户执行 SQL 查询。