CREATE SOURCE

语法说明

CREATE SOURCE 创建一个对流式数据的连接,并将一个新的 SOURCE 表添加到当前数据库中。

语法结构

  1. CREATE [OR REPLACE] SOURCE [IF NOT EXISTS] stream_name
  2. ( { column_name data_type [KEY | HEADERS | HEADER(key)] } [, ...] )
  3. WITH ( property_name = expression [, ...]);

语法解释

  • stream_name: SOURCE 名称。SOURCE 名称必须与当前数据库中任何现有的 SOURCE 名称不同。
  • column_name: 流式数据映射到 SOURCE 表中的列名。
  • data_type: column_name 对应字段在数据表中的类型。
  • property_name = expression: 有关流式数据映射的具体配置项名以及对应的值,可配置项如下:
property_nameexpression 描述
“type”仅支持’kafka’:目前仅支持接受的源为 kafka
“topic”kafka 数据源中对应的 topic
“partion”kafka 数据源中对应的 partion
“value”仅支持’json’: 目前仅支持接受的数据格式为 json
“bootstrap.servers”kafka 服务器对应的 IP:PORT
“sasl.username”指定连接到 Kafka 时使用的 SASL(Simple Authentication and Security Layer)用户名
“sasl.password”与 sasl.username 配对使用,这个参数提供了相应的密码
“sasl.mechanisms”客户端和服务器之间认证的 SASL 机制
“security.protocol”指定了与 Kafka 服务器通信时使用的安全协议

示例

  1. create source stream_test(c1 char(25),c2 varchar(500),c3 text,c4 tinytext,c5 mediumtext,c6 longtext )with(
  2. "type"='kafka',
  3. "topic"= 'test',
  4. "partition" = '0',
  5. "value"= 'json',
  6. "bootstrap.servers"='127.0.0.1:9092'
  7. )
  8. Query OK, 0 rows affected (0.01 sec)

限制

SOURCE 表目前不支持 drop 和 alter。

创建 SOURCE 表时目前仅支持连接 kafka,且仅支持传输数据格式为 json。