流规格

数据类型

在 Kuiper 中,每个列或表达式都有一个相关的数据类型。 数据类型描述(约束)该类型的列可以容纳的一组值或该类型可以产生的表达式。

以下是支持的数据类型的列表。

#数据类型说明
1bigint
2float
3string
4datetime
5boolean
6bytea用于存储二进制数据的字节数组。如果在格式为 “JSON” 的流中使用此类型,则传入的数据需要为 base64 编码的字符串。
7array数组类型可以是任何简单类型,数组类型或结构类型。
8struct复杂类型

语言定义

  1. CREATE STREAM
  2. stream_name
  3. ( column_name <data_type> [ ,...n ] )
  4. WITH ( property_name = expression [, ...] );

支持的属性名称

属性名称可选说明
DATASOURCE取决于不同的源类型;如果是 MQTT 源,则为 MQTT 数据源主题名;其它源请参考相关的文档。
FORMAT传入的数据类型,支持 “JSON” 和 “BINARY”,默认为 “JSON” 。关于 “BINARY” 类型的更多信息,请参阅 Binary Stream
KEY保留配置,当前未使用该字段。 它将用于 GROUP BY 语句。
TYPE源类型,如未指定,值为 “mqtt”。
StrictValidation针对流模式控制消息字段的验证行为。 有关更多信息,请参见 Strict Validation
CONF_KEY如果需要配置其他配置项,请在此处指定 config 键。 有关更多信息,请参见 MQTT stream

示例1

  1. my_stream
  2. (id bigint, name string, score float)
  3. WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id");

该流将订阅 MQTT 主题topic/temperature,服务器连接使用配置文件$kuiper/etc/mqtt_source.yaml 中默认部分的 servers 键。

示例2

  1. demo (
  2. USERID BIGINT,
  3. FIRST_NAME STRING,
  4. LAST_NAME STRING,
  5. NICKNAMES ARRAY(STRING),
  6. Gender BOOLEAN,
  7. ADDRESS STRUCT(STREET_NAME STRING, NUMBER BIGINT),
  8. ) WITH (DATASOURCE="test/", FORMAT="JSON", KEY="USERID", CONF_KEY="demo");

流将订阅 MQTT 主题 test/,服务器连接使用配置文件$kuiper/etc/mqtt_source.yaml 中 demo 部分的设置。

Strict Validation

  1. StrictValidation 的值可以为 true false
  2. 1True:如果消息不符合流定义,则删除消息。
  3. 2False:保留消息,但用默认的空值填充缺少的字段。
  4. bigint: 0
  5. float: 0.0
  6. string: ""
  7. datetime: (NOT support yet)
  8. boolean: false
  9. array: zero length array
  10. struct: null value

Schema-less 流

如果流的数据类型未知或不同,我们可以不使用字段来定义它。 这称为 schema-less。 通过将字段设置为空来定义它。

  1. schemaless_stream
  2. ()
  3. WITH ( datasource = "topic/temperature", FORMAT = "json", KEY = "id");

Schema-less 流字段数据类型将在运行时确定。 如果在不兼容子句中使用该字段,则会抛出运行时错误并将其发送到目标。 例如,where temperature > 30。 一旦温度不是数字,将错误发送到目标。

有关 SQL 语言的更多信息,请参见 查询语言元素

二进制流

对于二进制数据流,例如图像或者视频流,需要指定数据格式为 “BINARY” 。二进制流的数据为一个二进制数据块,不区分字段。所以,其流定义必须仅有一个 bytea 类型字段。如下流定义示例中,二进制流的数据将会解析为 demoBin 流中的 image 字段。

  1. demoBin (
  2. image BYTEA
  3. ) WITH (DATASOURCE="test/", FORMAT="BINARY");

如果 “BINARY” 格式流定义为 schemaless,数据将会解析到默认的名为 self 的字段。