Elasticsearch Example

Prepare to get module archive

Module archive is in the directory:inlong-sort-standalone/sort-standalone-dist/target/, the archive file is apache-inlong-sort-standalone-${project.version}-bin.tar.gz.

Prepare to modify configuration file

At first, decompress the archive file, copy three files in the directory “conf/es/“ to the directory “conf/“.

  • conf/common.properties, common configuration of all components.
  • conf/SortClusterConfig.conf, sink configuration of all sort tasks.
  • conf/sid_es_v3.conf, data source configuration example of a sort task, the file name is same with sort task name in SortClusterConfig.conf.

Example: conf/common.properties

  1. clusterId=esv3-sz-sz1
  2. nodeId=nodeId
  3. metricDomains=Sort
  4. metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
  5. metricDomains.Sort.snapshotInterval=60000
  6. sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
  7. sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
  8. sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
  9. sortClusterConfig.type=file
  10. sortClusterConfig.file=SortClusterConfig.conf
  11. sortSourceConfig.QueryConsumeConfigType=file
  12. #sortTaskId.conf
  13. #sortClusterConfig.type=manager
  14. #sortSourceConfig.QueryConsumeConfigType=manager
  15. #managerUrlLoaderType=org.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoader
  16. #sortClusterConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getClusterConfig
  17. #sortSourceConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getSortSource

Example: conf/SortClusterConfig.conf

  1. {
  2. "clusterName": "esv3-gz-gz1",
  3. "sortTasks": [{
  4. "name": "sid_es_v3",
  5. "type": "ES",
  6. "idParams": [{
  7. "indexNamePattern": "inlong0fc00000046_{yyyyMMdd}",
  8. "contentOffset": "0",
  9. "inlongGroupId": "atta",
  10. "fieldOffset": "2",
  11. "fieldNames": "ftime extinfo t1 t2 t3 t4",
  12. "inlongStreamId": "0fc00000046",
  13. "separator": "|"
  14. }],
  15. "sinkParams": {
  16. "httpHosts": "11.187.135.221:9200",
  17. "password": "yingyan@ES",
  18. "auditSetName": "es-rmrv7g7a",
  19. "bulkSizeMb": "10",
  20. "flushInterval": "60",
  21. "keywordMaxLength": "32767",
  22. "bulkAction": "4000",
  23. "concurrentRequests": "5",
  24. "maxConnect": "10",
  25. "isUseIndexId": "false",
  26. "username": "elastic"
  27. }
  28. }]
  29. }

Example: conf/sid_es_v3.conf

  1. {
  2. "sortClusterName": "esv3-gz-gz1",
  3. "sortTaskId": "sid_es_v3",
  4. "cacheZones": {
  5. "pc_atta6th_sz1": {
  6. "zoneName": "pc_atta6th_sz1",
  7. "serviceUrl": "http://9.139.53.86:8080",
  8. "authentication": "eyJrZXlJZCI6InB1bHNhci04MnhhN24zZWs1ZHciLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItODJ4YTduM2VrNWR3X2FkbWluIn0.D5H_j8UQk8KYWHw_mzq2HmR393SnbL5Gz7JYCANBPnI",
  9. "topics": [{
  10. "topic": "pulsar-82xa7n3ek5dw/atta/atta_topic_1",
  11. "partitionCnt": 10,
  12. "topicProperties": {}
  13. }],
  14. "cacheZoneProperties": {},
  15. "zoneType": "pulsar"
  16. }
  17. }
  18. }

Modify configuration file:conf/common.properties

ParameterRequiredDefaultValueRemark
clusterIdYNAinlong-sort-standalone cluster id
nodeIdNLocal IPCurrent node id
metricDomainsNSortdomain name of metric
metricDomains.Sort.domainListenersNorg.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListenerclass name list of metric listener, separated by space
metricDomains.Sort.snapshotIntervalN60000interval snapshoting metric data(millisecond)
prometheusHttpPortN8080HTTP server port of prometheus simple client
sortChannel.typeNorg.apache.inlong.sort.standalone.channel.BufferQueueChannelChannel class name
sortSink.typeYNASink class name
sortSource.typeNorg.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceSource class name
sortClusterConfig.typeNmanagerLoader source of cluster configuration data: [file,manager,UserDefinedClassName].
sortClusterConfig.fileNSortClusterConfig.confFile name in class resource when sortClusterConfig.type=file.
sortClusterConfig.managerUrlNNAThe parameter is the cluster configuration URL of InlongManager when sortClusterConfig.type=manager.
For example:http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getClusterConfig
sortSourceConfig.QueryConsumeConfigTypeNmanagerLoader source of sort task configuration data: [file,manager,UserDefinedClassName].
Sort task configuration file is ${sortTaskId}.conf in the class resource when sortSourceConfig.QueryConsumeConfigType=file.
sortSourceConfig.managerUrlNNAThe parameter is the sort task configuration URL of InlongManager when sortClusterConfig.type=manager.
For example:http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getSortSource

Modify configuration file: SortClusterConfig.conf

  • Get cluster configuration data from the file:SortClusterConfig.conf in classpath, it can not support online updating.
  • Get cluster configuration data from InlongManager URL, it can support online updating.
ParameterRequiredTypeDefaultValueRemark
clusterNameYStringNAinlong-sort-standalone cluster id
sortTasksYJsonArray<SortTaskConfig>NASort task list

Modify configuration: SortTaskConfig

ParameterRequiredDefaultValueRemark
nameYNAsort task name
typeYNAsort task type, for example:HIVE(“hive”), TUBE(“tube”), KAFKA(“kafka”), PULSAR(“pulsar”), ELASTICSEARCH(“elasticsearch”), UNKNOWN(“n”)
idParamsYNAInlong DataStream configuration
sinkParamsYNASort task parameters

Modify configuration: idParams of Elasticsearch sort task

ParameterRequiredDefaultValueRemark
inlongGroupIdYNAinlongGroupId
inlongStreamIdYNAinlongStreamId
separatorYNAseparator of Inlong datastream in data source
fieldNamesYNAfield name list of Elasticsearch index, separated by space.
indexNamePatternYNAindex name pattern of Elasticsearch,date time variable include {yyyyMMdd},{yyyyMMddHH},{yyyyMMddHHmm}.
contentOffsetYNAfield index offset of source content
fieldOffsetYNAoffset of Elasticsearch index field name list

Modify configuration: sinkParams of Elasticsearch sort task

ParameterRequiredDefaultValueRemark
httpHostsYNAHosts of Elasticsearch
usernameYNAUsername of Elasticsearch
passwordYNAPassword of Elasticsearch
isUseIndexIdNfalseCreate index id or not
bulkSizeMbN10Max content size per bulk(MB)
flushIntervalN60Max interval between flushing operation(Second)
keywordMaxLengthN32767Max keyword length(Byte)
bulkActionN4000Max index request per bulk
maxConnectN10Max opening HTTP connect
concurrentRequestsN5Max concurrent requests per HTTP connect

Modify configuration file: sid_es_v3.conf

  • The file name include sort task name plus the postfix “.conf”.
  • Get the configuration data from the file in classpath, it can not support online updating.
  • Get the configuration data from InlongManager URL, it can support online updating.

Modify configuration: sid_es_v3.conf

ParameterRequiredTypeDefaultValueRemark
sortClusterNameYStringNAinlong-sort-standalone cluster id
sortTaskIdYStringNASort task name
cacheZonesYJsonObject<String, JsonObject>NACache cluster list, Map<cacheClusterName, CacheCluster>

Modify configuration: CacheCluster

ParameterRequiredTypeDefaultValueRemark
zoneNameYStringNAcache cluster name
zoneTypeYStringNA[pulsar,tube,kafka]
serviceUrlYStringNAPulsar serviceUrl or Kafka broker list
authenticationYStringNAPulsar authentication
cacheZonePropertiesNMap<String,String>NACache consumer configuration
topicsNList<Topic>NATopic list of Cache consumer

Modify configuration: Topic

ParameterRequiredTypeDefaultValueRemark
topicYStringNAcache topic name
partitionCntYIntegerNAcache topic partition count
topicPropertiesNMap<String,String>NACache topic configuration

Start inlong-sort-standalone application

At last, execute the shell file “./bin/sort-start.sh” for starting sort-standalone, you can check the log file “sort.log”.