Hive Example

Prepare to get module archive

Module archive is in the directory:inlong-sort-standalone/sort-standalone-dist/target/, the archive file is apache-inlong-sort-standalone-${project.version}-bin.tar.gz.

Prepare to modify configuration file

At first, decompress the archive file, copy three files in the directory “conf/hive/“ to the directory “conf/“.

  • conf/common.properties, common configuration of all components.
  • conf/SortClusterConfig.conf, sink configuration of all sort tasks.
  • conf/sid_hive_inlong6th_v3.conf, data source configuration example of a sort task, the file name is same with sort task name in SortClusterConfig.conf.

Example: conf/common.properties

  1. clusterId=hivev3-sz-sz1
  2. nodeId=nodeId
  3. metricDomains=Sort
  4. metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
  5. metricDomains.Sort.snapshotInterval=60000
  6. sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
  7. sortSink.type=org.apache.inlong.sort.standalone.sink.hive.HiveSink
  8. sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
  9. sortClusterConfig.type=file
  10. sortClusterConfig.file=SortClusterConfig.conf
  11. sortSourceConfig.QueryConsumeConfigType=file
  12. # manager config example
  13. #sortClusterConfig.type=manager
  14. #sortSourceConfig.QueryConsumeConfigType=manager
  15. #managerUrlLoaderType=org.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoader
  16. #sortClusterConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getClusterConfig
  17. #sortSourceConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getSortSource

Example: conf/SortClusterConfig.conf

  1. {
  2. "data":{
  3. "clusterName":"hivev3-sz-sz1",
  4. "sortTasks":[
  5. {
  6. "idParams":[
  7. {
  8. "inlongGroupId":"0fc00000046",
  9. "inlongStreamId":"",
  10. "separator":"|",
  11. "partitionIntervalMs":3600000,
  12. "idRootPath":"/user/hive/warehouse/t_inlong_v1_0fc00000046",
  13. "partitionSubPath":"/{yyyyMMdd}/{yyyyMMddHH}",
  14. "hiveTableName":"t_inlong_v1_0fc00000046",
  15. "partitionFieldName":"dt",
  16. "partitionFieldPattern":"yyyyMMddHH",
  17. "msgTimeFieldPattern":"yyyy-MM-dd HH:mm:ss",
  18. "maxPartitionOpenDelayHour":8
  19. },
  20. {
  21. "inlongGroupId":"03600000045",
  22. "inlongStreamId":"",
  23. "separator":"|",
  24. "partitionIntervalMs":3600000,
  25. "idRootPath":"/user/hive/warehouse/t_inlong_v1_03600000045",
  26. "partitionSubPath":"/{yyyyMMdd}/{yyyyMMddHH}",
  27. "hiveTableName":"t_inlong_v1_03600000045",
  28. "partitionFieldName":"dt",
  29. "partitionFieldPattern":"yyyyMMddHH",
  30. "msgTimeFieldPattern":"yyyy-MM-dd HH:mm:ss",
  31. "maxPartitionOpenDelayHour":8
  32. },
  33. {
  34. "inlongGroupId":"05100054990",
  35. "inlongStreamId":"",
  36. "separator":"|",
  37. "partitionIntervalMs":3600000,
  38. "idRootPath":"/user/hive/warehouse/t_inlong_v1_05100054990",
  39. "partitionSubPath":"/{yyyyMMdd}/{yyyyMMddHH}",
  40. "hiveTableName":"t_inlong_v1_05100054990",
  41. "partitionFieldName":"dt",
  42. "partitionFieldPattern":"yyyyMMddHH",
  43. "msgTimeFieldPattern":"yyyy-MM-dd HH:mm:ss",
  44. "maxPartitionOpenDelayHour":8
  45. },
  46. {
  47. "inlongGroupId":"09c00014434",
  48. "inlongStreamId":"",
  49. "separator":"|",
  50. "partitionIntervalMs":3600000,
  51. "idRootPath":"/user/hive/warehouse/t_inlong_v1_09c00014434",
  52. "partitionSubPath":"/{yyyyMMdd}/{yyyyMMddHH}",
  53. "hiveTableName":"t_inlong_v1_09c00014434",
  54. "partitionFieldName":"dt",
  55. "partitionFieldPattern":"yyyyMMddHH",
  56. "msgTimeFieldPattern":"yyyy-MM-dd HH:mm:ss",
  57. "maxPartitionOpenDelayHour":8
  58. },
  59. {
  60. "inlongGroupId":"0c900035509",
  61. "inlongStreamId":"",
  62. "separator":"|",
  63. "partitionIntervalMs":3600000,
  64. "idRootPath":"/user/hive/warehouse/t_inlong_v1_0c900035509",
  65. "partitionSubPath":"/{yyyyMMdd}/{yyyyMMddHH}",
  66. "hiveTableName":"t_inlong_v1_0c900035509",
  67. "partitionFieldName":"dt",
  68. "partitionFieldPattern":"yyyyMMddHH",
  69. "msgTimeFieldPattern":"yyyy-MM-dd HH:mm:ss",
  70. "maxPartitionOpenDelayHour":8
  71. }
  72. ],
  73. "name":"sid_hive_inlong6th_v3",
  74. "sinkParams":{
  75. "hdfsPath":"hdfs://127.0.0.1:9000",
  76. "maxFileOpenDelayMinute":"5",
  77. "tokenOvertimeMinute":"60",
  78. "maxOutputFileSizeGb":"2",
  79. "hiveJdbcUrl":"jdbc:hive2://127.0.0.2:10000",
  80. "hiveDatabase":"default",
  81. "hiveUsername":"hive",
  82. "hivePassword":"hive"
  83. },
  84. "type":"HIVE"
  85. }
  86. ]
  87. },
  88. "errCode":0,
  89. "md5":"md5",
  90. "result":true
  91. }

Example: conf/sid_hive_inlong6th_v3.conf

  1. {
  2. "sortClusterName": "hivev3-sz-sz1",
  3. "sortTaskId": "sid_hive_inlong6th_v3",
  4. "cacheZones": {
  5. "pc_atta6th_sz1": {
  6. "zoneName": "pc_atta6th_sz1",
  7. "serviceUrl": "http://9.139.53.86:8080",
  8. "authentication": "eyJrZXlJZCI6InB1bHNhci04MnhhN24zZWs1ZHciLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItODJ4YTduM2VrNWR3X2FkbWluIn0.D5H_j8UQk8KYWHw_mzq2HmR393SnbL5Gz7JYCANBPnI",
  9. "topics": [{
  10. "topic": "pulsar-82xa7n3ek5dw/atta/atta_topic_1",
  11. "partitionCnt": 10,
  12. "topicProperties": {}
  13. }],
  14. "cacheZoneProperties": {},
  15. "zoneType": "pulsar"
  16. }
  17. }
  18. }

Modify configuration file:conf/common.properties

ParameterRequiredDefaultValueRemark
clusterIdYNAinlong-sort-standalone cluster id
nodeIdNLocal IPCurrent node id
metricDomainsNSortdomain name of metric
metricDomains.Sort.domainListenersNorg.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListenerclass name list of metric listener, separated by space
metricDomains.Sort.snapshotIntervalN60000interval snapshoting metric data(millisecond)
prometheusHttpPortN8080HTTP server port of prometheus simple client
sortChannel.typeNorg.apache.inlong.sort.standalone.channel.BufferQueueChannelChannel class name
sortSink.typeYNASink class name
sortSource.typeNorg.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceSource class name
sortClusterConfig.typeNmanagerLoader source of cluster configuration data: [file,manager,UserDefinedClassName].
sortClusterConfig.fileNSortClusterConfig.confFile name in class resource when sortClusterConfig.type=file.
sortClusterConfig.managerUrlNNAThe parameter is the cluster configuration URL of InlongManager when sortClusterConfig.type=manager.
For example:http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getClusterConfig
sortSourceConfig.QueryConsumeConfigTypeNmanagerLoader source of sort task configuration data: [file,manager,UserDefinedClassName].
Sort task configuration file is ${sortTaskId}.conf in the class resource when sortSourceConfig.QueryConsumeConfigType=file.
sortSourceConfig.managerUrlNNAThe parameter is the sort task configuration URL of InlongManager when sortClusterConfig.type=manager.
For example:http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getSortSource

Modify configuration file: SortClusterConfig.conf

  • Get cluster configuration data from the file:SortClusterConfig.conf in classpath, it can not support online updating.
  • Get cluster configuration data from InlongManager URL, it can support online updating.
ParameterRequiredTypeDefaultValueRemark
clusterNameYStringNAinlong-sort-standalone cluster id
sortTasksYJsonArray<SortTaskConfig>NASort task list

Modify configuration: SortTaskConfig

ParameterRequiredDefaultValueRemark
nameYNAsort task name
typeYNAsort task type, for example:HIVE(“hive”), TUBE(“tube”), KAFKA(“kafka”), PULSAR(“pulsar”), ElasticSearch(“ElasticSearch”), UNKNOWN(“n”)
idParamsYNAInlong DataStream configuration
sinkParamsYNASort task parameters

Modify configuration: idParams of Hive sort task

ParameterRequiredDefaultValueRemark
inlongGroupIdYNAinlongGroupId
inlongStreamIdYNAinlongStreamId
separatorYNAseparator of Inlong datastream in data source
partitionIntervalMsN3600000partition interval(millisecond)
idRootPathYNAHDFS root path of Inlong DataStream
partitionSubPathYNApartition sub path of Inlong DataStream
hiveTableNameYNAHive table name of Inlong DataStream
partitionFieldNameNdtpartition field name of Inlong DataStream
partitionFieldPatternYNADate format of partition field value, the type have {yyyyMMdd},{yyyyMMddHH},{yyyyMMddHHmm}
msgTimeFieldPatternYNADate format of message generation time, it support Java date format
maxPartitionOpenDelayHourN8Max delay hour of partition(hour)

Modify configuration: sinkParams of Hive sort task

ParameterRequiredDefaultValueRemark
hdfsPathYNANameNode URL of HDFS
maxFileOpenDelayMinuteN5Max writing delay minutes of simple HDFS file(minute)
tokenOvertimeMinuteN60token overtime of Inlong Data Stream(minute)
maxOutputFileSizeGbN2Max file size of simple HDFS file(GB)
hiveJdbcUrlYNAJDBC URL of Hive
hiveDatabaseYNAHive database
hiveUsernameYNAHive username
hivePasswordYNAHive password

Modify configuration file: sid_hive_inlong6th_v3.conf

  • The file name include sort task name plus the postfix “.conf”.

Modify configuration: sid_hive_inlong6th_v3.conf

ParameterRequiredTypeDefaultValueRemark
sortClusterNameYStringNAinlong-sort-standalone cluster id
sortTaskIdYStringNASort task name
cacheZonesYJsonObject<String, JsonObject>NACache cluster list, Map<cacheClusterName, CacheCluster>

Modify configuration: CacheCluster

ParameterRequiredTypeDefaultValueRemark
zoneNameYStringNAcache cluster name
zoneTypeYStringNA[pulsar,tube,kafka]
serviceUrlYStringNAPulsar serviceUrl or Kafka broker list
authenticationYStringNAPulsar authentication
cacheZonePropertiesNMap<String,String>NACache consumer configuration
topicsNList<Topic>NATopic list of Cache consumer

Modify configuration: Topic

ParameterRequiredTypeDefaultValueRemark
topicYStringNAcache topic name
partitionCntYIntegerNAcache topic partition count
topicPropertiesNMap<String,String>NACache topic configuration