Kafka

功能介绍

写Kafka, 支持kafka 1.x和2.x版

参数说明

名称 中文名称 描述 类型 是否必须? 默认值
bootstrapServers “bootstrap.servers” “bootstrap.servers” String
topic topic名称 topic名称 String
properties 额外的kafka参数配置 额外的kafka参数配置,格式形如”prop1=val1,prop2=val2” String
dataFormat 数据格式 数据格式。json或csv String
fieldDelimiter 字段分隔符 字段分隔符 String “,”

脚本示例

  1. URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv";
  2. SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
  3. data = CsvSourceStreamOp().setFilePath(URL).setSchemaStr(SCHEMA_STR)
  4. sink = KafkaSinkStreamOp() \
  5. .setBootstrapServers("localhost:9092").setDataFormat("json") \
  6. .setTopic("iris")
  7. sink.linkFrom(data)
  8. StreamOperator.execute()