Elasticsearch 示例

准备安装文件

安装文件在inlong-sort-standalone/sort-standalone-dist/target/目录下,文件名是apache-inlong-sort-standalone-${project.version}-bin.tar.gz。

准备修改配置文件

首先,解压压缩包apache-inlong-sort-standalone-${project.version}-bin.tar.gz,然后从目录”conf/es/“下拷贝3个文件到目录”conf/“。

  • conf/common.properties,所有组件的基本配置参数
  • conf/SortClusterConfig.conf,所有Sort任务的sink配置。, sink configuration of all sort tasks.
  • conf/sid_es_v3.conf,一个Sort任务的数据源配置,文件名和配置文件SortClusterConfig.conf中的Sort任务名一致,如果SortClusterConfig.conf中配置了多个Sort任务,那么会有多个Sort任务的数据源配置。

conf/common.properties配置样例

  1. clusterId=esv3-sz-sz1
  2. nodeId=nodeId
  3. metricDomains=Sort
  4. metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
  5. metricDomains.Sort.snapshotInterval=60000
  6. sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
  7. sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
  8. sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
  9. sortClusterConfig.type=file
  10. sortClusterConfig.file=SortClusterConfig.conf
  11. sortSourceConfig.QueryConsumeConfigType=file
  12. #sortTaskId.conf
  13. #sortClusterConfig.type=manager
  14. #sortSourceConfig.QueryConsumeConfigType=manager
  15. #managerUrlLoaderType=org.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoader
  16. #sortClusterConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getClusterConfig
  17. #sortSourceConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getSortSource

conf/SortClusterConfig.conf配置样例

  1. {
  2. "clusterName": "esv3-gz-gz1",
  3. "sortTasks": [{
  4. "name": "sid_es_v3",
  5. "type": "ES",
  6. "idParams": [{
  7. "indexNamePattern": "inlong0fc00000046_{yyyyMMdd}",
  8. "contentOffset": "0",
  9. "inlongGroupId": "atta",
  10. "fieldOffset": "2",
  11. "fieldNames": "ftime extinfo t1 t2 t3 t4",
  12. "inlongStreamId": "0fc00000046",
  13. "separator": "|"
  14. }],
  15. "sinkParams": {
  16. "httpHosts": "11.187.135.221:9200",
  17. "password": "yingyan@ES",
  18. "auditSetName": "es-rmrv7g7a",
  19. "bulkSizeMb": "10",
  20. "flushInterval": "60",
  21. "keywordMaxLength": "32767",
  22. "bulkAction": "4000",
  23. "concurrentRequests": "5",
  24. "maxConnect": "10",
  25. "isUseIndexId": "false",
  26. "username": "elastic"
  27. }
  28. }]
  29. }

conf/sid_es_v3.conf配置样例

  1. {
  2. "sortClusterName": "esv3-gz-gz1",
  3. "sortTaskId": "sid_es_v3",
  4. "cacheZones": {
  5. "pc_atta6th_sz1": {
  6. "zoneName": "pc_atta6th_sz1",
  7. "serviceUrl": "http://9.139.53.86:8080",
  8. "authentication": "eyJrZXlJZCI6InB1bHNhci04MnhhN24zZWs1ZHciLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItODJ4YTduM2VrNWR3X2FkbWluIn0.D5H_j8UQk8KYWHw_mzq2HmR393SnbL5Gz7JYCANBPnI",
  9. "topics": [{
  10. "topic": "pulsar-82xa7n3ek5dw/atta/atta_topic_1",
  11. "partitionCnt": 10,
  12. "topicProperties": {}
  13. }],
  14. "cacheZoneProperties": {},
  15. "zoneType": "pulsar"
  16. }
  17. }
  18. }

conf/common.properties配置参数

参数名是否必须默认值描述
clusterIdYNA用来唯一标识一个inlong-sort-standalone集群
nodeIdN本机IP当前节点ID
metricDomainsNSort指标汇总域名
metricDomains.Sort.domainListenersNorg.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener指标汇总监听器类名列表,空格分隔
metricDomains.Sort.snapshotIntervalN60000订阅tube的重试超时时间,单位为ms
prometheusHttpPortN8080org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener的参数,Prometheus的HttpServer端口
sortChannel.typeNorg.apache.inlong.sort.standalone.channel.BufferQueueChannelChannel类型
sortSink.typeYNASink类名,不同的分发类型使用不同的Sink类
sortSource.typeNorg.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceSource类名
sortClusterConfig.typeNmanager集群配置数据的加载来源,有三种方式:[文件,Manager,自定义类]。
sortClusterConfig.fileNSortClusterConfig.conf当集群配置数据加载来源是file时,在类路径下的配置文件名
sortClusterConfig.managerUrlNNA集群配置数据加载来源是manager时,这里定义InlongManager的URL
比如:http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getClusterConfig
sortSourceConfig.QueryConsumeConfigTypeNmanagerSort任务配置数据的加载来源,有三种方式:[文件,Manager,自定义类]。
如果加载路径是file的话,Sort任务配置文件是在类路径里,文件名的格式:${sortTaskId}.conf。
sortSourceConfig.managerUrlNNASort任务配置数据加载来源是manager时,这里定义InlongManager的URL
比如::http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getSortSource

SortClusterConfig.conf配置参数

  • 可以从ClassPath的SortClusterConfig.conf源文件读取,但不支持实时更新
  • 可以从Inlong Manager的HTTP接口获取配置,支持实时更新
参数名是否必须类型默认值描述
clusterNameYStringNA用来唯一标识一个inlong-sort-standalone集群
sortTasksYJsonArray<SortTaskConfig>NASort任务列表

SortTaskConfig配置参数

参数名是否必须默认值描述
nameYNASort任务名
typeYNASort任务类型,如:HIVE(“hive”), TUBE(“tube”), KAFKA(“kafka”), PULSAR(“pulsar”), ELASTICSEARCH(“elasticsearch”), UNKNOWN(“n”)
idParamsYNAInlong数据流参数列表
sinkParamsYNASort任务参数

Sort-Elasticsearch任务的idParams配置参数

参数名是否必须默认值描述
inlongGroupIdYNAinlongGroupId
inlongStreamIdYNAinlongStreamId
separatorYNA分隔符
fieldNamesYNAElasticsearch的Index字段列表,用空格分隔
indexNamePatternYNAIndex的名字模板,支持三种日期时间格式变量:{yyyyMMdd},{yyyyMMddHH},{yyyyMMddHHmm}
contentOffsetYNA源数据的有效字段开始偏移,从0开始
fieldOffsetYNAElasticsearch的Index字段列表的开始偏移

Sort-Elasticsearch任务的sinkParams配置参数

参数名是否必须默认值描述
httpHostsYNAElasticsearch的Host的IP端口
usernameYNAElasticsearch用户名
passwordYNAElasticsearch密码
isUseIndexIdNfalse是否创建IndexId,影响Index分片分布
bulkSizeMbN10单Bulk的最大大小,单位MB
flushIntervalN60刷盘间隔,单位是秒
keywordMaxLengthN32767单个keyword最大长度,单位是字节
bulkActionN4000单个Bulk的最大IndexRequest数
maxConnectN10最大HTTP连接数
concurrentRequestsN5单个HTTP连接的最大等待请求数

Sort-Elasticsearch任务的sid_es_v3.conf配置参数

  • 文件名格式:Sort任务名+”.conf”。
  • 可以从ClassPath的SortClusterConfig.conf源文件读取,但不支持实时更新。
  • 可以从Inlong Manager的HTTP接口获取配置,支持实时更新。

sid_es_v3.conf配置参数

参数名是否必须类型默认值描述
sortClusterNameYStringNA用来唯一标识一个inlong-sort-standalone集群
sortTaskIdYStringNASort任务名
cacheZonesYJsonObject<String, JsonObject>NA缓存层集群列表,格式:Map<cacheClusterName, CacheCluster>

CacheCluster配置参数

参数名是否必须类型默认值描述
zoneNameYStringNA缓存层集群名
zoneTypeYStringNA缓存类型:[pulsar,tube,kafka]
serviceUrlYStringNAPulsar的serviceUrl参数,或者Kafka的Broker列表
authenticationYStringNAPulsar鉴权
cacheZonePropertiesNMap<String,String>NA缓存层Consumer参数
topicsNList<Topic>NA缓存层消费的Topic列表

Topic配置参数

参数名是否必须类型默认值描述
topicYStringNATopic完整名,Pulsar:tenant/namespace/topic
partitionCntYIntegerNATopic分区数
topicPropertiesNMap<String,String>NA缓存层Topic的Consumer参数

启动inlong-sort-standalone应用

最后,执行脚本”./bin/sort-start.sh”,启动sort-standalone应用,之后可以检查日志文件sort.log,确认启动情况。