Kafka连接器

概述

此连接器允许在openLooKeng中将Apache Kafka主题用作表。每条消息在openLooKeng中显示为一行。

主题可以是实时的:当数据到达时,行将出现,当段被删除时,行将消失。如果在单个查询中多次访问同一个表(例如,执行自联接),这可能会导致奇怪的行为。

说明

Kafka代理最低支持版本为0.10.0。

配置

要配置Kafka连接器,创建具有以下内容的目录属性文件etc/catalog/kafka.properties,并适当替换以下属性:

  1. connector.name=kafka
  2. kafka.table-names=table1,table2
  3. kafka.nodes=host1:port,host2:port

多Kafka集群

可以根据需要创建任意多的目录,因此,如果有额外的Kafka集群,只需添加另一个不同的名称的属性文件到etc/catalog中(确保它以.properties结尾)。例如,如果将属性文件命名为sales.properties,openLooKeng将使用配置的连接器创建一个名为sales的目录。

配置属性

配置属性包括:

属性名称说明
kafka.table-names目录提供的所有表列表
kafka.default-schema表的默认模式名
kafka.nodesKafka集群节点列表
kafka.connect-timeout连接Kafka集群超时
kafka.buffer-sizeKafka读缓冲区大小
kafka.table-description-dir包含主题描述文件的目录
kafka.hide-internal-columns控制内部列是否是表模式的一部分

kafka.table-names

此目录提供的所有表的逗号分隔列表。表名可以是非限定的(简单名称),并将被放入默认模式(见下文)中,或者用模式名称(<schema-name>.<table-name>)限定。

对于这里定义的每个表,都可能存在一个表描述文件(见下文)。如果没有表描述文件,则使用表名作为Kafka的主题名称,且数据列不映射到表。该表仍将包含所有内部列(见下文)。

此属性是必需的;没有默认值,并且必须至少定义一个表。

kafka.default-schema

定义将包含没有定义限定模式名称的所有表的模式。

此属性是可选的;默认值为default

kafka.nodes

Kafka数据节点的hostname:port对的逗号分隔列表。

此属性是必需的;没有默认值,并且必须至少定义一个表。

说明

openLooKeng必须仍然能够连接到群集的所有节点,即使这里只指定了子集,因为段文件可能只位于特定的节点上。

kafka.connect-timeout

连接数据节点超时。繁忙的Kafka集群在接受连接之前可能要花费一些时间;当看到由于超时而导致的查询失败时,增加该值是一种很好的策略。

此属性是可选的;默认值为10秒(10s)。

kafka.buffer-size

从Kafka读取数据的内部数据缓冲区大小。数据缓冲区必须至少能够容纳一条消息,理想情况下可以容纳多条消息。每个工作节点和数据节点分配一个数据缓冲区。

此属性是可选的;默认值为64kb

kafka.table-description-dir

在openLooKeng部署中引用一个文件夹,其中包含一个或多个JSON文件(必须以.json结尾),其中包含表描述文件。

此属性是可选的;默认值为etc/kafka

kafka.hide-internal-columns

除了在表描述文件中定义数据列外,连接器还为每个表维护许多附加列。如果这些列是隐藏的,它们仍然可以在查询中使用,但是不会显示在DESCRIBE <table-name>SELECT *中。

此属性是可选的;默认值为true

内部列

对于每个已定义的表,连接器维护以下列:

列名类型说明
_partition_idBIGINT包含该行的Kafka分区ID。
_partition_offsetBIGINT此行在Kafka分区内的偏移量。
_segment_startBIGINT包含该行的段(包括该段)中最小的偏移量。这个偏移量是分区特定的。
_segment_endBIGINT包含该行的段(不包括该段)中最大的偏移量。这个偏移量是分区特定的。这与下一个段(如果存在)的_segment_start的值相同。
_segment_countBIGINT段内当前行的运行计数。对于未压缩的主题,_segment_start + _segment_count等于_partition_offset
_message_corruptBOOLEAN如果解码器无法解码此行的消息,则为true。如果为true,则应将消息映射的数据列视为无效。
_messageVARCHAR作为UTF-8编码的字符串的消息字节。这只对文本主题有用。
_message_lengthBIGINT消息字节数。
_key_corruptBOOLEAN如果解码器无法解码此行的键,则为true。如果为true,则应将该键映射的数据列视为无效。
_keyVARCHAR作为UTF-8编码的字符串的键字节。这只对文本键有用。
_key_lengthBIGINT键字节数。

对于没有表定义文件的表,_key_corrupt列和_message_corrupt列将始终为false

表定义文件

Kafka仅以字节消息的形式维护主题,并让生产者和消费者定义如何解释消息。对于openLooKeng,必须将这些数据映射到列中,以便允许对数据进行查询。

说明

对于包含JSON数据的文本主题,完全可以使用openLooKeng /functions/json来解析包含映射到UTF-8字符串中的字节的_message列,而不用任何表定义文件。但是这相当麻烦,并且使得编写SQL查询变得很困难。

表定义文件由一个表的JSON定义组成。文件名可以任意,但必须以.json结尾。

  1. {
  2. "tableName": ...,
  3. "schemaName": ...,
  4. "topicName": ...,
  5. "key": {
  6. "dataFormat": ...,
  7. "fields": [
  8. ...
  9. ]
  10. },
  11. "message": {
  12. "dataFormat": ...,
  13. "fields": [
  14. ...
  15. ]
  16. }
  17. }
字段是否必填类型说明
tableName必填string该文件定义的openLooKeng表名。
schemaName可选string将包含表的模式。如果省略,则使用默认模式名称。
topicName必填string映射的Kafka主题。
key可选JSON对象映射到消息键的数据列的字段定义。
message可选JSON对象映射到消息本身的数据列的字段定义。

Kafka中的键和消息

从Kafka 0.8版本开始,每个主题中的每条消息都可以有一个可选的键。表定义文件包含键和消息的节,用于将数据映射到表列。

表定义中的key字段和message字段均为必须包含两个字段的JSON对象:

字段是否必填类型说明
dataFormat必填string选择该组字段的解码器。
fields必填JSON数组字段定义列表。每个字段定义在openLooKeng表中创建一个新列。

每个字段定义都是一个JSON对象:

  1. {
  2. "name": ...,
  3. "type": ...,
  4. "dataFormat": ...,
  5. "mapping": ...,
  6. "formatHint": ...,
  7. "hidden": ...,
  8. "comment": ...
  9. }
字段是否必填类型说明
name必填stringopenLooKeng表中的列名。
type必填string列的openLooKeng类型。
dataFormat可选string选择该字段的列解码器。默认使用此行数据格式和列类型的默认解码器。
dataSchema可选stringAvro模式所在的路径或URL。仅用于Avro解码器。
mapping可选string列的映射信息。这是解码器特定的,见下文。
formatHint可选string设置列解码器的列特定格式提示。
hidden可选boolean将列对DESCRIBESELECT *隐藏。默认为false
comment可选string添加列注释,该注释通过DESCRIBE显示。

键或消息的字段说明不受限制。

行解码

对于键和消息,使用解码器将消息和键数据映射到表列。

Kafka连接器包含以下的解码器:

  • raw - 不解释Kafka消息,将原始消息字节范围映射到表列
  • csv - Kafka消息被解释为逗号分隔消息,字段映射到表列
  • json - Kafka消息解释为JSON,JSON字段映射到表列
  • avro - Kafka消息按照Avro模式解析,Avro字段映射到表列

说明

如果表没有定义文件,则使用dummy解码器,该解码器不暴露任何列。

raw解码器

Raw解码器支持从Kafka消息或键中读取原始(基于字节)值并将其转换为openLooKeng列。

对于字段,支持如下属性:

  • dataFormat-选择转换数据类型的宽度
  • type - openLooKeng数据类型(支持的数据类型列表见下表)
  • mapping - <start>[:<end>];要转换的字节的开始和结束位置(可选)

dataFormat属性选择转换的字节数。如果不填,则假定为BYTE。所有值都有符号。

支持的值为:

  • BYTE - 1字节
  • SHORT - 2字节(大端序)
  • INT - 4字节(大端序)
  • LONG - 8字节(大端序)
  • FLOAT - 4字节(IEEE 754格式)
  • DOUBLE - 8字节(IEEE 754格式)

type属性定义值映射到的openLooKeng数据类型。

根据分配给列的openLooKeng类型,可以使用不同的dataFormat值:

openLooKeng数据类型允许dataFormat
BIGINTBYTESHORTINTLONG
INTEGERBYTESHORTINT
SMALLINTBYTESHORT
TINYINTBYTE
DOUBLEDOUBLEFLOAT
BOOLEANBYTESHORTINTLONG
VARCHAR / VARCHAR(x)BYTE

mapping属性指定用于解码的键或消息中的字节范围。可以是1个或2个数字,中间用冒号隔开(<start>[:<end>])。

如果只给出起始位置:

  • 对于固定宽度类型,该列将对指定的dateFormat使用适当字节数(见上文)。
  • VARCHAR值被解码时,从起始位置到消息结尾的所有字节将被使用。

如果给出开始和结束位置,则:

  • 对于固定宽度类型,大小必须等于指定dataFormat所使用的字节数。
  • 对于VARCHAR,起始(包括)和结束(不包括)之间的所有字节都将被使用。

如果未指定mapping属性,则等效于将起始位置设置为0,而将结束位置设置为未定义。

数值数据类型(BIGINTINTEGERSMALLINTTINYINTDOUBLE)的解码方案非常简单。从输入消息中读取字节序列并根据以下任一条件进行解码:

  • 大端序编码(integer类型)
  • IEEE 754格式(用于DOUBLE)。

dataFormat所隐含的已解码字节序列的长度。对于VARCHAR数据类型,字节序列根据UTF-8编码进行解释。

csv解码器

CSV解码器将代表消息或键的字节转换为UTF-8编码的字符串,然后将结果解释为CSV(逗号分隔值)行。

对于字段,必须定义typemapping属性:

  • type - openLooKeng数据类型(支持的数据类型列表见下表)
  • mapping - CSV记录中字段的索引

dataFormatformatHint不支持,必须省略。下表列出了支持的openLooKeng类型,可用于type和解码方案:

openLooKeng数据类型解码规则
BIGINT INTEGER SMALLINT TINYINT使用Java Long.parseLong()解码
DOUBLE使用Java Double.parseDouble()解码
BOOLEAN“true”字符序列映射到true;其他字符序列映射到false
VARCHAR / VARCHAR(x)原样使用

json解码器

JSON解码器根据4627将代表消息或键的字节转换为JSON。请注意,消息或键必须转换为JSON对象,而不是数组或简单类型。

对于字段,支持如下属性:

  • type - 列的openLooKeng类型。
  • dataFormat - 用于列的字段解码器。
  • mapping - 以斜杠分隔的字段名列表,用于从JSON对象中选择字段
  • formatHint - 仅限custom-date-time,详见下文

JSON解码器支持多个字段解码器,_default用于标准表列和许多基于日期和时间的类型的解码器。

下表列出了可如type中使用的openLooKeng数据类型和可通过dataFormat属性指定的匹配字段解码器。

openLooKeng数据类型允许dataFormat
BIGINT INTEGER SMALLINT TINYINT DOUBLE BOOLEAN VARCHAR VARCHAR(x)默认字段解码器(省略dataFormat属性)
TIMESTAMP TIMESTAMP WITH TIME ZONE TIME TIME WITH TIME ZONEcustom-date-timeiso8601rfc2822milliseconds-since-epochseconds-since-epoch
DATEcustom-date-timeiso8601rfc2822

默认字段译码器

这是标准的字段解码器,支持所有的openLooKeng物理数据类型。通过JSON转换规则,字段值将被强制转换为boolean值、long值、double值或string值。对于非基于日期/时间的列,应使用此解码器。

日期和时间解码器

如果需要将JSON对象中的值转换为openLooKeng DATETIMETIME WITH TIME ZONETIMESTAMPTIMESTAMP WITH TIME ZONE列,则需要通过字段定义的dataFormat属性选择特定的解码器。

  • iso8601 - 基于文本,将文本字段解析为ISO8601时间戳。

  • rfc2822 - 基于文本,将文本字段解析为2822时间戳。

  • custom-date-time - 基于文本,根据通过formatHint属性指定的Joda格式模式解析一个文本字段。格式模式应符合https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html

  • milliseconds-since-epoch - 基于数字,将文本或数字解释为自epoch时间以来的毫秒数。

  • seconds-since-epoch - 基于数字,将文本或数字解释为自epoch时间以来的毫秒数。

对于TIMESTAMP WITH TIME ZONETIME WITH TIME ZONE数据类型,如果解码值中存在时区信息,则在openLooKeng值中使用时区。否则,结果时区将被设置为UTC

avro解码器

Avro解码器根据模式转换表示Avro格式的消息或键的字节。消息必须嵌入Avro模式。openLooKeng不支持无模式Avro解码。

对于键/消息,使用avro解码器时,必须定义dataSchema。这应该指向需要解码的消息的有效Avro模式文件的位置。此位置可以是远程Web服务器(例如:dataSchema: 'http://example.org/schema/avro_data.avsc')或本地文件系统(例如:dataSchema: '/usr/local/schema/avro_data.avsc')。如果无法从openLooKeng协调节点访问此位置,解码器将失败。

对于字段,支持如下属性:

  • name - openLooKeng表中的列名。
  • type - 列的openLooKeng类型。
  • mapping - 以斜杠分隔的字段名列表,用于从Avro模式中选择字段如果mapping中指定的字段在原始Avro模式中不存在,则读取操作将返回NULL。

下表列出了支持的openLooKeng类型,可在type中用于等价的Avro字段类型。

openLooKeng数据类型允许的Avro数据类型
BIGINTINTLONG
DOUBLEDOUBLEFLOAT
BOOLEANBOOLEAN
VARCHAR / VARCHAR(x)STRING
VARBINARYFIXEDBYTES
ARRAYARRAY
MAPMAP

Avro模式演进

Avro解码器支持向后兼容的模式演进特性。通过向后兼容性,就可以使用较新的模式读取用较旧的模式创建的Avro数据。Avro模式中的任何更改也必须反映在openLooKeng的主题定义文件中。Avro模式中新增/重命名的字段必须有默认值。

Schema的演进行为如下:

  • 新模式中增加的列:当表使用新模式时,使用旧模式创建的数据将产生默认值。
  • 新模式中移除的列:使用旧模式创建的数据将不再输出已移除列的数据。
  • 列在新的模式中被重命名:这等价于移除列并添加新列,当表使用新模式时,使用旧模式创建的数据将产生默认值。
  • 更改新模式中的列类型:如果Avro支持该类型强制,那么就会发生转换。不兼容的类型将引发错误。