Elasticsearch Example

Prepare to get module archive

Module archive is in the directory:inlong-sort-standalone/sort-standalone-dist/target/, the archive file is apache-inlong-sort-standalone-${project.version}-bin.tar.gz.

Prepare to modify configuration file

At first, decompress the archive file, copy three files in the directory “conf/es/“ to the directory “conf/“.

  • conf/common.properties, common configuration of all components.
  • conf/SortClusterConfig.conf, sink configuration of all sort tasks.
  • conf/sid_es_v3.conf, data source configuration example of a sort task, the file name is same with sort task name in SortClusterConfig.conf.

Example: conf/common.properties

  1. clusterId=esv3-sz-sz1
  2. nodeId=nodeId
  3. metricDomains=Sort
  4. metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
  5. metricDomains.Sort.snapshotInterval=60000
  6. sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
  7. sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
  8. sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
  9. sortClusterConfig.type=file
  10. sortClusterConfig.file=SortClusterConfig.conf
  11. sortSourceConfig.QueryConsumeConfigType=file
  12. # manager config example
  13. #sortClusterConfig.type=manager
  14. #sortSourceConfig.QueryConsumeConfigType=manager
  15. #managerUrlLoaderType=org.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoader
  16. #sortClusterConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getClusterConfig
  17. #sortSourceConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getSortSource

Example: conf/SortClusterConfig.conf

  1. {
  2. "clusterName": "esv3-gz-gz1",
  3. "sortTasks": [{
  4. "name": "sid_es_v3",
  5. "type": "ES",
  6. "idParams": [{
  7. "indexNamePattern": "inlong0fc00000046_{yyyyMMdd}",
  8. "contentOffset": "0",
  9. "inlongGroupId": "testgroup",
  10. "fieldOffset": "0",
  11. "fieldNames": "ftime extinfo t1 t2 t3 t4",
  12. "inlongStreamId": "0fc00000046",
  13. "separator": "|"
  14. }],
  15. "sinkParams": {
  16. "httpHosts": "ip:port",
  17. "password": "password",
  18. "bulkSizeMb": "10",
  19. "flushInterval": "60",
  20. "keywordMaxLength": "32767",
  21. "bulkAction": "4000",
  22. "concurrentRequests": "5",
  23. "maxConnect": "10",
  24. "isUseIndexId": "false",
  25. "username": "elastic"
  26. }
  27. }]
  28. }

Example: conf/sid_es_v3.conf

  1. {
  2. "sortClusterName": "esv3-gz-gz1",
  3. "sortTaskId": "sid_es_v3",
  4. "cacheZones": {
  5. "pc_atta6th_sz1": {
  6. "zoneName": "${PULSAR_CLUSTER_NAME}",
  7. "serviceUrl": "http://${PULSAR_IP}:${PULSAR_PORT}",
  8. "authentication": "${PULSAR_AUTH}",
  9. "topics": [{
  10. "topic": "${TENANT/NAMESPACE/TOPIC}",
  11. "partitionCnt": 10,
  12. "topicProperties": {}
  13. }],
  14. "cacheZoneProperties": {},
  15. "zoneType": "pulsar"
  16. }
  17. }
  18. }

Modify configuration: idParams of Elasticsearch sort task

ParameterRequiredDefaultValueRemark
inlongGroupIdYNAinlongGroupId
inlongStreamIdYNAinlongStreamId
separatorYNAseparator of Inlong datastream in data source
fieldNamesYNAfield name list of Elasticsearch index, separated by space.
indexNamePatternYNAindex name pattern of Elasticsearch,date time variable include {yyyyMMdd},{yyyyMMddHH},{yyyyMMddHHmm}.
contentOffsetYNAfield index offset of source content
fieldOffsetYNAoffset of Elasticsearch index field name list

Modify configuration: sinkParams of Elasticsearch sort task

ParameterRequiredDefaultValueRemark
httpHostsYNAHosts of Elasticsearch
usernameYNAUsername of Elasticsearch
passwordYNAPassword of Elasticsearch
isUseIndexIdNfalseCreate index id or not
bulkSizeMbN10Max content size per bulk(MB)
flushIntervalN60Max interval between flushing operation(Second)
keywordMaxLengthN32767Max keyword length(Byte)
bulkActionN4000Max index request per bulk
maxConnectN10Max opening HTTP connect
concurrentRequestsN5Max concurrent requests per HTTP connect

Modify configuration: sid_es_v3.conf

  • The file name include sort task name plus the postfix “.conf”.
ParameterRequiredTypeDefaultValueRemark
sortClusterNameYStringNAinlong-sort-standalone cluster id
sortTaskIdYStringNASort task name
cacheZonesYJsonObject<String, JsonObject>NACache cluster list, Map<cacheClusterName, CacheCluster>

Modify configuration: CacheCluster

ParameterRequiredTypeDefaultValueRemark
zoneNameYStringNAcache cluster name
zoneTypeYStringNA[pulsar,tube,kafka]
serviceUrlYStringNAPulsar serviceUrl or Kafka broker list
authenticationYStringNAPulsar authentication
cacheZonePropertiesNMap<String,String>NACache consumer configuration
topicsNList<Topic>NATopic list of Cache consumer

Modify configuration: Topic

ParameterRequiredTypeDefaultValueRemark
topicYStringNAcache topic name
partitionCntYIntegerNAcache topic partition count
topicPropertiesNMap<String,String>NACache topic configuration