Pulsar2Elasticsearch Example

Prepare to get module archive

Module archive is in the directory:inlong-sort-standalone/sort-standalone-dist/target/, the archive file is apache-inlong-sort-standalone-${project.version}-bin.tar.gz.

Prepare to modify configuration file

At first, decompress the archive file, copy three files in the directory “conf/es/“ to the directory “conf/“.

  • conf/common.properties, common configuration of all components.
  • conf/SortClusterConfig.conf, sink configuration of all sort tasks.
  • conf/sid_es_v3.conf, data source configuration example of a sort task, the file name is same with sort task name in SortClusterConfig.conf.

Example: conf/common.properties

  1. clusterId=esv3-sz-sz1
  2. nodeId=nodeId
  3. metricDomains=Sort
  4. metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
  5. metricDomains.Sort.snapshotInterval=60000
  6. sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
  7. sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
  8. sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
  9. sortClusterConfig.type=file
  10. sortClusterConfig.file=SortClusterConfig.conf
  11. sortSourceConfig.QueryConsumeConfigType=file
  12. # manager config example
  13. #sortClusterConfig.type=manager
  14. #sortSourceConfig.QueryConsumeConfigType=manager
  15. #managerUrlLoaderType=org.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoader
  16. #sortClusterConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getClusterConfig
  17. #sortSourceConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getSortSource

Example: conf/SortClusterConfig.conf

  1. {
  2. "clusterName": "esv3-gz-gz1",
  3. "sortTasks": [{
  4. "name": "sid_es_v3",
  5. "type": "ES",
  6. "idParams": [{
  7. "indexNamePattern": "inlong0fc00000046_{yyyyMMdd}",
  8. "contentOffset": "0",
  9. "inlongGroupId": "testgroup",
  10. "fieldOffset": "0",
  11. "fieldNames": "ftime extinfo t1 t2 t3 t4",
  12. "inlongStreamId": "0fc00000046",
  13. "separator": "|"
  14. }],
  15. "sinkParams": {
  16. "httpHosts": "ip:port",
  17. "password": "password",
  18. "bulkSizeMb": "10",
  19. "flushInterval": "60",
  20. "keywordMaxLength": "32767",
  21. "bulkAction": "4000",
  22. "concurrentRequests": "5",
  23. "maxConnect": "10",
  24. "isUseIndexId": "false",
  25. "username": "elastic"
  26. }
  27. }]
  28. }

Example: conf/sid_es_v3.conf

  1. {
  2. "sortClusterName": "esv3-gz-gz1",
  3. "sortTaskId": "sid_es_v3",
  4. "cacheZones": {
  5. "pc_inlong6th_sz1": {
  6. "zoneName": "${PULSAR_CLUSTER_NAME}",
  7. "serviceUrl": "http://${PULSAR_IP}:${PULSAR_PORT}",
  8. "authentication": "${PULSAR_AUTH}",
  9. "topics": [{
  10. "topic": "${TENANT/NAMESPACE/TOPIC}",
  11. "partitionCnt": 10,
  12. "topicProperties": {}
  13. }],
  14. "cacheZoneProperties": {},
  15. "zoneType": "pulsar"
  16. }
  17. }
  18. }