Description

Data sink for kafka 1.x and 2.x

Parameters

Name Description Type Required? Default Value
bootstrapServers kafka bootstrap servers String
topic topic String
properties additional kafka configurations additional kafka configurations, such as “prop1=val1,prop2=val2” String
dataFormat data format String
fieldDelimiter Field delimiter String “,”

Script Example

  1. URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv";
  2. SCHEMA_STR = "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
  3. data = CsvSourceStreamOp().setFilePath(URL).setSchemaStr(SCHEMA_STR)
  4. sink = KafkaSinkStreamOp() \
  5. .setBootstrapServers("localhost:9092").setDataFormat("json") \
  6. .setTopic("iris")
  7. sink.linkFrom(data)
  8. StreamOperator.execute()