Pulsar2Hive Example

Prepare to get module archive

Module archive is in the directory:inlong-sort-standalone/sort-standalone-dist/target/, the archive file is apache-inlong-sort-standalone-${project.version}-bin.tar.gz.

Prepare to modify configuration file

At first, decompress the archive file, copy three files in the directory “conf/hive/“ to the directory “conf/“.

  • conf/common.properties, common configuration of all components.
  • conf/SortClusterConfig.conf, sink configuration of all sort tasks.
  • conf/sid_hive_inlong6th_v3.conf, data source configuration example of a sort task, the file name is same with sort task name in SortClusterConfig.conf.

Example: conf/common.properties

  1. clusterId=hivev3-sz-sz1
  2. nodeId=nodeId
  3. metricDomains=Sort
  4. metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
  5. metricDomains.Sort.snapshotInterval=60000
  6. sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
  7. sortSink.type=org.apache.inlong.sort.standalone.sink.hive.HiveSink
  8. sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
  9. sortClusterConfig.type=file
  10. sortClusterConfig.file=SortClusterConfig.conf
  11. sortSourceConfig.QueryConsumeConfigType=file
  12. # manager config example
  13. #sortClusterConfig.type=manager
  14. #sortSourceConfig.QueryConsumeConfigType=manager
  15. #managerUrlLoaderType=org.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoader
  16. #sortClusterConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getClusterConfig
  17. #sortSourceConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getSortSource

Example: conf/SortClusterConfig.conf

  1. {
  2. "data":{
  3. "clusterName":"hivev3-sz-sz1",
  4. "sortTasks":[
  5. {
  6. "idParams":[
  7. {
  8. "inlongGroupId":"0fc00000046",
  9. "inlongStreamId":"",
  10. "separator":"|",
  11. "partitionIntervalMs":3600000,
  12. "idRootPath":"/user/hive/warehouse/t_inlong_v1_0fc00000046",
  13. "partitionSubPath":"/{yyyyMMdd}/{yyyyMMddHH}",
  14. "hiveTableName":"t_inlong_v1_0fc00000046",
  15. "partitionFieldName":"dt",
  16. "partitionFieldPattern":"yyyyMMddHH",
  17. "msgTimeFieldPattern":"yyyy-MM-dd HH:mm:ss",
  18. "maxPartitionOpenDelayHour":8
  19. },
  20. {
  21. "inlongGroupId":"03600000045",
  22. "inlongStreamId":"",
  23. "separator":"|",
  24. "partitionIntervalMs":3600000,
  25. "idRootPath":"/user/hive/warehouse/t_inlong_v1_03600000045",
  26. "partitionSubPath":"/{yyyyMMdd}/{yyyyMMddHH}",
  27. "hiveTableName":"t_inlong_v1_03600000045",
  28. "partitionFieldName":"dt",
  29. "partitionFieldPattern":"yyyyMMddHH",
  30. "msgTimeFieldPattern":"yyyy-MM-dd HH:mm:ss",
  31. "maxPartitionOpenDelayHour":8
  32. },
  33. {
  34. "inlongGroupId":"05100054990",
  35. "inlongStreamId":"",
  36. "separator":"|",
  37. "partitionIntervalMs":3600000,
  38. "idRootPath":"/user/hive/warehouse/t_inlong_v1_05100054990",
  39. "partitionSubPath":"/{yyyyMMdd}/{yyyyMMddHH}",
  40. "hiveTableName":"t_inlong_v1_05100054990",
  41. "partitionFieldName":"dt",
  42. "partitionFieldPattern":"yyyyMMddHH",
  43. "msgTimeFieldPattern":"yyyy-MM-dd HH:mm:ss",
  44. "maxPartitionOpenDelayHour":8
  45. },
  46. {
  47. "inlongGroupId":"09c00014434",
  48. "inlongStreamId":"",
  49. "separator":"|",
  50. "partitionIntervalMs":3600000,
  51. "idRootPath":"/user/hive/warehouse/t_inlong_v1_09c00014434",
  52. "partitionSubPath":"/{yyyyMMdd}/{yyyyMMddHH}",
  53. "hiveTableName":"t_inlong_v1_09c00014434",
  54. "partitionFieldName":"dt",
  55. "partitionFieldPattern":"yyyyMMddHH",
  56. "msgTimeFieldPattern":"yyyy-MM-dd HH:mm:ss",
  57. "maxPartitionOpenDelayHour":8
  58. },
  59. {
  60. "inlongGroupId":"0c900035509",
  61. "inlongStreamId":"",
  62. "separator":"|",
  63. "partitionIntervalMs":3600000,
  64. "idRootPath":"/user/hive/warehouse/t_inlong_v1_0c900035509",
  65. "partitionSubPath":"/{yyyyMMdd}/{yyyyMMddHH}",
  66. "hiveTableName":"t_inlong_v1_0c900035509",
  67. "partitionFieldName":"dt",
  68. "partitionFieldPattern":"yyyyMMddHH",
  69. "msgTimeFieldPattern":"yyyy-MM-dd HH:mm:ss",
  70. "maxPartitionOpenDelayHour":8
  71. }
  72. ],
  73. "name":"sid_hive_inlong6th_v3",
  74. "sinkParams":{
  75. "hdfsPath":"hdfs://127.0.0.1:9000",
  76. "maxFileOpenDelayMinute":"5",
  77. "tokenOvertimeMinute":"60",
  78. "maxOutputFileSizeGb":"2",
  79. "hiveJdbcUrl":"jdbc:hive2://127.0.0.2:10000",
  80. "hiveDatabase":"default",
  81. "hiveUsername":"hive",
  82. "hivePassword":"hive"
  83. },
  84. "type":"HIVE"
  85. }
  86. ]
  87. },
  88. "errCode":0,
  89. "md5":"md5",
  90. "result":true
  91. }

Example: conf/sid_hive_inlong6th_v3.conf

  1. {
  2. "sortClusterName": "hivev3-sz-sz1",
  3. "sortTaskId": "sid_hive_inlong6th_v3",
  4. "cacheZones": {
  5. "pc_inlong6th_sz1": {
  6. "zoneName": "${PULSAR_CLUSTER_NAME}",
  7. "serviceUrl": "http://${PULSAR_IP}:${PULSAR_PORT}",
  8. "authentication": "${PULSAR_AUTH}",
  9. "topics": [{
  10. "topic": "${TENANT/NAMESPACE/TOPIC}",
  11. "partitionCnt": 10,
  12. "topicProperties": {}
  13. }],
  14. "cacheZoneProperties": {},
  15. "zoneType": "pulsar"
  16. }
  17. }
  18. }