Deployment

Currently inlong-sort is based on flink, before you run an inlong-sort application, you need to set up flink environment.

Currently, inlong-sort relys on flink-1.13.5. Chose flink-1.13.5-bin-scala_2.11.tgz when downloading package.

Once your flink environment is set up, you can visit web ui of flink, whose address is stored in /${your_flink_path}/conf/masters.

Prepare installation files

All installation files at inlong-sort directory.

Starting an inlong-sort application

Now you can submit job to flink with the jar compiled, refer to how to submit job to flink.

Example:

  1. ./bin/flink run -c org.apache.inlong.sort.singletenant.flink.Entrance inlong-sort/sort-[version].jar \
  2. --cluster-id debezium2hive --dataflow.info.file /YOUR_DATAFLOW_INFO_DIR/debezium-to-hive.json \
  3. --source.type pulsar --sink.type hive --sink.hive.rolling-policy.rollover-interval 60000 \
  4. --metrics.audit.proxy.hosts 127.0.0.1:10081 --sink.hive.rolling-policy.check-interval 30000

Notice:

  • -c org.apache.inlong.sort.singletenant.flink.Entrance is the main class name

  • inlong-sort/sort-[version].jar is the compiled jar

Necessary configurations

  • --cluster-id represent a specified inlong-sort application, same as the configuration of sort.appName in inlong-manager
  • --dataflow.info.file dataflow configuration file path
  • --source.type source of the application, currently “pulsar” is supported
  • --sink.type sink of the application, currently “clickhouse”, “hive”, “iceberg”, “kafka” are supported
  • --metrics.audit.proxy.hosts audit proxy host address for reporting audit metrics

Example

  1. --cluster-id debezium2kafka-canal --dataflow.info.file /YOUR_DATAFLOW_INFO_DIR/debezium-to-kafka-canal.json \
  2. --source.type pulsar --sink.type kafka

All configurations

namenecessarydefault valuedescription
cluster-idYNAused to represent a specified inlong-sort application
source.typeYNAsource of the application, currently “pulsar” is supported
sink.typeYNAsink of the application, currently “clickhouse”, “hive”, “iceberg” and “kafka” are supported
source.parallelismN1parallelism of source
deserialization.parallelismN1parallelism of deserialization
transformation.parallelismN1parallelism of transformation
sink.parallelismN1parallelism of sink
checkpoint.intervalN600000checkpoint interval,unit: ms
min.pause.between.checkpoints.msN500the minimal checkpoint interval, unit:ms
checkpoint.timeout.msN600000checkpoint timeout,unit: ms
sink.field.type.string.nullableNfalsewhether the sink field of string type can be null or empty
sink.field.type.int.nullableNtruewhether the sink field of string type can be null or empty
sink.field.type.short.nullableNtruewhether the sink field of string type can be null or empty
sink.field.type.long.nullableNtruewhether the sink field of string type can be null or empty
sink.hive.rolling-policy.file-sizeN134217728The maximum part file size before rolling,unit: byte
sink.hive.rolling-policy.rollover-intervalN1800000The maximum time duration a part file can stay open before rolling(by default long enough to avoid too many small files). The frequency at which this is checked is controlled by the ‘sink.rolling-policy.check-interval’ option. Unit: ms
sink.hive.rolling-policy.check-intervalN60000The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on ‘sink.rolling-policy.rollover-interval’. Unit: ms