HBase SQL Connector

Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Upsert Mode

The HBase connector allows for reading from and writing to an HBase cluster. This document describes how to setup the HBase Connector to run SQL queries against HBase.

HBase always works in upsert mode for exchange changelog messages with the external system using a primary key defined on the DDL. The primary key must be defined on the HBase rowkey field (rowkey field must be declared). If the PRIMARY KEY clause is not declared, the HBase connector will take rowkey as the primary key by default.

Dependencies

In order to setup the HBase connector, the following table provide dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

HBase VersionMaven dependencySQL Client JAR
1.4.xflink-connector-hbase_2.11Unsupported

Note: To use HBase connector in SQL Client or Flink cluster, it’s highly recommended to add HBase dependency jars to Hadoop classpath. Flink will load all jars under Hadoop classpath automatically, please refer to HBase, MapReduce, and the CLASSPATH about how to add HBase dependency jars to Hadoop classpath.

How to use HBase table

All the column families in HBase table must be declared as ROW type, the field name maps to the column family name, and the nested field names map to the column qualifier names. There is no need to declare all the families and qualifiers in the schema, users can declare what’s used in the query. Except the ROW type fields, the single atomic type field (e.g. STRING, BIGINT) will be recognized as HBase rowkey. The rowkey field can be arbitrary name, but should be quoted using backticks if it is a reserved keyword.

  1. -- register the HBase table 'mytable' in Flink SQL
  2. CREATE TABLE hTable (
  3. rowkey INT,
  4. family1 ROW<q1 INT>,
  5. family2 ROW<q2 STRING, q3 BIGINT>,
  6. family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
  7. PRIMARY KEY (rowkey) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'hbase-1.4',
  10. 'table-name' = 'mytable',
  11. 'zookeeper.quorum' = 'localhost:2181'
  12. );
  13. -- use ROW(...) construction function construct column families and write data into the HBase table.
  14. -- assuming the schema of "T" is [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
  15. INSERT INTO hTable
  16. SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
  17. -- scan data from the HBase table
  18. SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;
  19. -- temporal join the HBase table as a dimension table
  20. SELECT * FROM myTopic
  21. LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
  22. ON myTopic.key = hTable.rowkey;

Connector Options

OptionRequiredDefaultTypeDescription
connector
required(none)StringSpecify what connector to use, here should be ‘hbase-1.4’.
table-name
required(none)StringThe name of HBase table to connect.
zookeeper.quorum
required(none)StringThe HBase Zookeeper quorum.
zookeeper.znode.parent
optional/hbaseStringThe root dir in Zookeeper for HBase cluster.
null-string-literal
optionalnullStringRepresentation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.
sink.buffer-flush.max-size
optional2mbMemorySizeWriting option, maximum size in memory of buffered rows for each writing request. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to ‘0’ to disable it.
sink.buffer-flush.max-rows
optional1000IntegerWriting option, maximum number of rows to buffer for each writing request. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to ‘0’ to disable it.
sink.buffer-flush.interval
optional1sDurationWriting option, the interval to flush any buffered rows. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to ‘0’ to disable it. Note, both ‘sink.buffer-flush.max-size’ and ‘sink.buffer-flush.max-rows’ can be set to ‘0’ with the flush interval set allowing for complete async processing of buffered actions.

Data Type Mapping

HBase stores all data as byte arrays. The data needs to be serialized and deserialized during read and write operation

When serializing and de-serializing, Flink HBase connector uses utility class org.apache.hadoop.hbase.util.Bytes provided by HBase (Hadoop) to convert Flink Data Types to and from byte arrays.

Flink HBase connector encodes null values to empty bytes, and decode empty bytes to null values for all data types except string type. For string type, the null literal is determined by null-string-literal option.

The data type mappings are as follows:

Flink SQL typeHBase conversion
CHAR / VARCHAR / STRING
  1. byte[] toBytes(String s)
  2. String toString(byte[] b)
BOOLEAN
  1. byte[] toBytes(boolean b)
  2. boolean toBoolean(byte[] b)
BINARY / VARBINARYReturns byte[] as is.
DECIMAL
  1. byte[] toBytes(BigDecimal v)
  2. BigDecimal toBigDecimal(byte[] b)
TINYINT
  1. new byte[] { val }
  2. bytes[0] // returns first and only byte from bytes
SMALLINT
  1. byte[] toBytes(short val)
  2. short toShort(byte[] bytes)
INT
  1. byte[] toBytes(int val)
  2. int toInt(byte[] bytes)
BIGINT
  1. byte[] toBytes(long val)
  2. long toLong(byte[] bytes)
FLOAT
  1. byte[] toBytes(float val)
  2. float toFloat(byte[] bytes)
DOUBLE
  1. byte[] toBytes(double val)
  2. double toDouble(byte[] bytes)
DATEStores the number of days since epoch as int value.
TIMEStores the number of milliseconds of the day as int value.
TIMESTAMPStores the milliseconds since epoch as long value.
ARRAYNot supported
MAP / MULTISETNot supported
ROWNot supported

Back to top