Description

kafka 011 sink.

Parameters

Name Description Type Required? Default Value
bootstrapServers kafka集群地址 String
topic topic String
dataFormat data format String
fieldDelimiter Field delimiter String “,”

Script Example

  1. URL = "http://alink-dataset.cn-hangzhou.oss.aliyun-inc.com/csv/iris.csv";
  2. SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
  3. data = CsvSourceStreamOp().setFilePath(URL).setSchemaStr(SCHEMA_STR)
  4. sink = Kafka011SinkStreamOp() \
  5. .setBootstrapServers("localhost:9092").setDataFormat("json") \
  6. .setTopic("iris")
  7. sink.linkFrom(data)
  8. StreamOperator.execute()