Kafka011

功能介绍

写Kafka 0.11版

参数说明

名称 中文名称 描述 类型 是否必须? 默认值
bootstrapServers “bootstrap.servers” “bootstrap.servers” String
topic topic名称 topic名称 String
dataFormat 数据格式 数据格式。json或csv String
fieldDelimiter 字段分隔符 字段分隔符 String “,”

脚本示例

  1. URL = "http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv";
  2. SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
  3. data = CsvSourceStreamOp().setFilePath(URL).setSchemaStr(SCHEMA_STR)
  4. sink = Kafka011SinkStreamOp() \
  5. .setBootstrapServers("localhost:9092").setDataFormat("json") \
  6. .setTopic("iris")
  7. sink.linkFrom(data)
  8. StreamOperator.execute()