Kafka Connector

Overview

This connector allows the use of Apache Kafka topics as tables in openLooKeng. Each message is presented as a row in openLooKeng.

Topics can be live: rows will appear as data arrives and disappear as segments get dropped. This can result in strange behavior if accessing the same table multiple times in a single query (e.g., performing a self join).

Note

The minimum supported Kafka broker version is 0.10.0.

Configuration

To configure the Kafka connector, create a catalog properties file etc/catalog/kafka.properties with the following contents, replacing the properties as appropriate:

  1. connector.name=kafka
  2. kafka.table-names=table1,table2
  3. kafka.nodes=host1:port,host2:port

Multiple Kafka Clusters

You can have as many catalogs as you need, so if you have additional Kafka clusters, simply add another properties file to etc/catalog with a different name (making sure it ends in .properties). For example, if you name the property file sales.properties, openLooKeng will create a catalog named sales using the configured connector.

Configuration Properties

The following configuration properties are available:

Property NameDescription
kafka.table-namesList of all tables provided by the catalog
kafka.default-schemaDefault schema name for tables
kafka.nodesList of nodes in the Kafka cluster
kafka.connect-timeoutTimeout for connecting to the Kafka cluster
kafka.buffer-sizeKafka read buffer size
kafka.table-description-dirDirectory containing topic description files
kafka.hide-internal-columnsControls whether internal columns are part of the table schema or not

kafka.table-names

Comma-separated list of all tables provided by this catalog. A table name can be unqualified (simple name) and will be put into the default schema (see below) or qualified with a schema name (<schema-name>.<table-name>).

For each table defined here, a table description file (see below) may exist. If no table description file exists, the table name is used as the topic name on Kafka and no data columns are mapped into the table. The table will still contain all internal columns (see below).

This property is required; there is no default and at least one table must be defined.

kafka.default-schema

Defines the schema which will contain all tables that were defined without a qualifying schema name.

This property is optional; the default is default.

kafka.nodes

A comma separated list of hostname:port pairs for the Kafka data nodes.

This property is required; there is no default and at least one node must be defined.

Note

openLooKeng must still be able to connect to all nodes of the cluster even if only a subset is specified here as segment files may be located only on a specific node.

kafka.connect-timeout

Timeout for connecting to a data node. A busy Kafka cluster may take quite some time before accepting a connection; when seeing failed queries due to timeouts, increasing this value is a good strategy.

This property is optional; the default is 10 seconds (10s).

kafka.buffer-size

Size of the internal data buffer for reading data from Kafka. The data buffer must be able to hold at least one message and ideally can hold many messages. There is one data buffer allocated per worker and data node.

This property is optional; the default is 64kb.

kafka.table-description-dir

References a folder within openLooKeng deployment that holds one or more JSON files (must end with .json) which contain table description files.

This property is optional; the default is etc/kafka.

kafka.hide-internal-columns

In addition to the data columns defined in a table description file, the connector maintains a number of additional columns for each table. If these columns are hidden, they can still be used in queries but do not show up in DESCRIBE <table-name> or SELECT *.

This property is optional; the default is true.

Internal Columns

For each defined table, the connector maintains the following columns:

Column nameTypeDescription
_partition_idBIGINTID of the Kafka partition which contains this row.
_partition_offsetBIGINTOffset within the Kafka partition for this row.
_segment_startBIGINTLowest offset in the segment (inclusive) which contains this row. This offset is partition specific.
_segment_endBIGINTHighest offset in the segment (exclusive) which contains this row. The offset is partition specific. This is the same value as _segment_start of the next segment (if it exists).
_segment_countBIGINTRunning count for the current row within the segment. For an uncompacted topic, _segment_start + _segment_count is equal to _partition_offset.
_message_corruptBOOLEANTrue if the decoder could not decode the message for this row. When true, data columns mapped from the message should be treated as invalid.
_messageVARCHARMessage bytes as an UTF-8 encoded string. This is only useful for a text topic.
_message_lengthBIGINTNumber of bytes in the message.
_key_corruptBOOLEANTrue if the key decoder could not decode the key for this row. When true, data columns mapped from the key should be treated as invalid.
_keyVARCHARKey bytes as an UTF-8 encoded string. This is only useful for textual keys.
_key_lengthBIGINTNumber of bytes in the key.

For tables without a table definition file, the _key_corrupt and _message_corrupt columns will always be false.

Table Definition Files

Kafka maintains topics only as byte messages and leaves it to producers and consumers to define how a message should be interpreted. For openLooKeng, this data must be mapped into columns to allow queries against the data.

Note

For textual topics that contain JSON data, it is entirely possible to not use any table definition files, but instead use the openLooKeng /functions/json to parse the _message column which contains the bytes mapped into an UTF-8 string. This is, however, pretty cumbersome and makes it difficult to write SQL queries.

A table definition file consists of a JSON definition for a table. The name of the file can be arbitrary but must end in .json.

  1. {
  2. "tableName": ...,
  3. "schemaName": ...,
  4. "topicName": ...,
  5. "key": {
  6. "dataFormat": ...,
  7. "fields": [
  8. ...
  9. ]
  10. },
  11. "message": {
  12. "dataFormat": ...,
  13. "fields": [
  14. ...
  15. ]
  16. }
  17. }
FieldRequiredTypeDescription
tableNamerequiredstringopenLooKeng table name defined by this file.
schemaNameoptionalstringSchema which will contain the table. If omitted, the default schema name is used.
topicNamerequiredstringKafka topic that is mapped.
keyoptionalJSON objectField definitions for data columns mapped to the message key.
messageoptionalJSON objectField definitions for data columns mapped to the message itself.

Key and Message in Kafka

Starting with Kafka 0.8, each message in a topic can have an optional key. A table definition file contains sections for both key and message to map the data onto table columns.

Each of the key and message fields in the table definition is a JSON object that must contain two fields:

FieldRequiredTypeDescription
dataFormatrequiredstringSelects the decoder for this group of fields.
fieldsrequiredJSON arrayA list of field definitions. Each field definition creates a new column in the openLooKeng table.

Each field definition is a JSON object:

  1. {
  2. "name": ...,
  3. "type": ...,
  4. "dataFormat": ...,
  5. "mapping": ...,
  6. "formatHint": ...,
  7. "hidden": ...,
  8. "comment": ...
  9. }
FieldRequiredTypeDescription
namerequiredstringName of the column in the openLooKeng table.
typerequiredstringopenLooKeng type of the column.
dataFormatoptionalstringSelects the column decoder for this field. Defaults to the default decoder for this row data format and column type.
dataSchemaoptionalstringThe path or URL where the Avro schema resides. Used only for Avro decoder.
mappingoptionalstringMapping information for the column. This is decoder specific, see below.
formatHintoptionalstringSets a column specific format hint to the column decoder.
hiddenoptionalbooleanHides the column from DESCRIBE and SELECT *. Defaults to false.
commentoptionalstringAdds a column comment which is shown with DESCRIBE .

There is no limit on field descriptions for either key or message.

Row Decoding

For key and message, a decoder is used to map message and key data onto table columns.

The Kafka connector contains the following decoders:

  • raw - Kafka message is not interpreted, ranges of raw message bytes are mapped to table columns
  • csv - Kafka message is interpreted as comma separated message, and fields are mapped to table columns
  • json - Kafka message is parsed as JSON and JSON fields are mapped to table columns
  • avro - Kafka message is parsed based on an Avro schema and Avro fields are mapped to table columns

Note

If no table definition file exists for a table, the dummy decoder is used, which does not expose any columns.

raw Decoder

The raw decoder supports reading of raw (byte based) values from Kafka message or key and converting it into openLooKeng columns.

For fields, the following attributes are supported:

  • dataFormat - selects the width of the data type converted
  • type - openLooKeng data type (see table below for list of supported data types)
  • mapping - <start>[:<end>]; start and end position of bytes to convert (optional)

The dataFormat attribute selects the number of bytes converted. If absent, BYTE is assumed. All values are signed.

Supported values are:

  • BYTE - one byte
  • SHORT - two bytes (big-endian)
  • INT - four bytes (big-endian)
  • LONG - eight bytes (big-endian)
  • FLOAT - four bytes (IEEE 754 format)
  • DOUBLE - eight bytes (IEEE 754 format)

The type attribute defines the openLooKeng data type on which the value is mapped.

Depending on openLooKeng type assigned to column different values of dataFormat can be used:

openLooKeng data typeAllowed dataFormat values
BIGINTBYTE, SHORT, INT, LONG
INTEGERBYTE, SHORT, INT
SMALLINTBYTE, SHORT
TINYINTBYTE
DOUBLEDOUBLE, FLOAT
BOOLEANBYTE, SHORT, INT, LONG
VARCHAR / VARCHAR(x)BYTE

The mapping attribute specifies the range of the bytes in a key or message used for decoding. It can be one or two numbers separated by a colon (<start>[:<end>]).

If only a start position is given:

  • For fixed width types the column will use the appropriate number of bytes for the specified dateFormat (see above).
  • When VARCHAR value is decoded all bytes from start position till the end of the message will be used.

If start and end position are given, then:

  • For fixed width types the size must be equal to number of bytes used by specified dataFormat.
  • For VARCHAR all bytes between start (inclusive) and end (exclusive) are used.

If no mapping attribute is specified it is equivalent to setting start position to 0 and leaving end position undefined.

Decoding scheme of numeric data types (BIGINT, INTEGER, SMALLINT, TINYINT, DOUBLE) is straightforward. A sequence of bytes is read from input message and decoded according to either:

  • big-endian encoding (for integer types)
  • IEEE 754 format for (for DOUBLE).

Length of decoded byte sequence is implied by the dataFormat. For VARCHAR data type a sequence of bytes is interpreted according to UTF-8 encoding.

csv Decoder

The CSV decoder converts the bytes representing a message or key into a string using UTF-8 encoding and then interprets the result as a CSV (comma-separated value) line.

For fields, the type and mapping attributes must be defined:

  • type - openLooKeng data type (see table below for list of supported data types)
  • mapping - the index of the field in the CSV record

dataFormat and formatHint are not supported and must be omitted. Table below lists supported openLooKeng types which can be used in type and decoding scheme:

openLooKeng data typeDecoding rules
BIGINT INTEGER SMALLINT TINYINTDecoded using Java Long.parseLong()
DOUBLEDecoded using Java Double.parseDouble()
BOOLEAN“true” character sequence maps to true; Other character sequences map to false
VARCHAR / VARCHAR(x)Used as is

json Decoder

The JSON decoder converts the bytes representing a message or key into a JSON according to 4627. Note that the message or key MUST convert into a JSON object, not an array or simple type.

For fields, the following attributes are supported:

  • type - openLooKeng type of column.
  • dataFormat - Field decoder to be used for column.
  • mapping - slash-separated list of field names to select a field from the JSON object
  • formatHint - only for custom-date-time, see below

The JSON decoder supports multiple field decoders, with _default being used for standard table columns and a number of decoders for date and time based types.

Table below lists openLooKeng data types which can be used as in type and matching field decoders which can be specified via dataFormatattribute

openLooKeng data typeAllowed dataFormat values
BIGINT INTEGER SMALLINT TINYINT DOUBLE BOOLEAN VARCHAR VARCHAR(x)Default field decoder (omitted dataFormat attribute)
TIMESTAMP TIMESTAMP WITH TIME ZONE TIME TIME WITH TIME ZONEcustom-date-time, iso8601, rfc2822, milliseconds-since-epoch, seconds-since-epoch
DATEcustom-date-time, iso8601, rfc2822,

Default Field decoder

This is the standard field decoder supporting all the openLooKeng physical data types. A field value will be coerced by JSON conversion rules into boolean, long, double or string values. For non-date/time based columns, this decoder should be used.

Date and Time Decoders

To convert values from JSON objects into openLooKeng DATE, TIME, TIME WITH TIME ZONE, TIMESTAMP or TIMESTAMP WITH TIME ZONE columns, special decoders must be selected using the dataFormat attribute of a field definition.

  • iso8601 - text based, parses a text field as an ISO 8601 timestamp.

  • rfc2822 - text based, parses a text field as an 2822 timestamp.

  • custom-date-time - text based, parses a text field according to Joda format pattern specified via formatHint attribute. Format pattern should conform to https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html.

  • milliseconds-since-epoch - number based, interprets a text or number as number of milliseconds since the epoch.

  • seconds-since-epoch - number based, interprets a text or number as number of milliseconds since the epoch.

For TIMESTAMP WITH TIME ZONE and TIME WITH TIME ZONE data types, if timezone information is present in decoded value, it will be used in openLooKeng value. Otherwise result time zone will be set to UTC.

avro Decoder

The Avro decoder converts the bytes representing a message or key in Avro format based on a schema. The message must have the Avro schema embedded. openLooKeng does not support schema-less Avro decoding.

For key/message, using avro decoder, the dataSchema must be defined. This should point to the location of a valid Avro schema file of the message which needs to be decoded. This location can be a remote web server (e.g.: dataSchema: 'http://example.org/schema/avro_data.avsc') or local file system(e.g.: dataSchema: '/usr/local/schema/avro_data.avsc'). The decoder will fail if this location is not accessible from the openLooKeng coordinator node.

For fields, the following attributes are supported:

  • name - Name of the column in the openLooKeng table.
  • type - openLooKeng type of column.
  • mapping - slash-separated list of field names to select a field from the Avro schema. If field specified in mapping does not exist in the original Avro schema then a read operation will return NULL.

Table below lists supported openLooKeng types which can be used in type for the equivalent Avro field type/s.

openLooKeng data typeAllowed Avro data type
BIGINTINT, LONG
DOUBLEDOUBLE, FLOAT
BOOLEANBOOLEAN
VARCHAR / VARCHAR(x)STRING
VARBINARYFIXED, BYTES
ARRAYARRAY
MAPMAP

Avro schema evolution

The Avro decoder supports schema evolution feature with backward compatibility. With backward compatibility, a newer schema can be used to read Avro data created with an older schema. Any change in the Avro schema must also be reflected in openLooKeng’s topic definition file. Newly added/renamed fields must have a default value in the Avro schema file.

The schema evolution behavior is as follows:

  • Column added in new schema: Data created with an older schema will produce a default value when table is using the new schema.
  • Column removed in new schema: Data created with an older schema will no longer output the data from the column that was removed.
  • Column is renamed in the new schema: This is equivalent to removing the column and adding a new one, and data created with an older schema will produce a default value when table is using the new schema.
  • Changing type of column in the new schema: If the type coercion is supported by Avro, then the conversion happens. An error is thrown for incompatible types.