Examples

To make it easier for you to create inlong-sort jobs, here we list some dataflow configuration examples.

Pulsar to Kafka

Normal example:

  1. {
  2. "id": 1,
  3. "source_info": {
  4. "type": "pulsar",
  5. "admin_url": "YOUR_PULSAR_ADMIN_URL",
  6. "service_url": "YOUR_PULSAR_SERVICE_URL",
  7. "topic": "YOUR_PULSAR_TOPIC",
  8. "subscription_name": "debezium2canal",
  9. "deserialization_info": {
  10. "type": "debezium_json",
  11. "ignore_parse_errors": true,
  12. "timestamp_format_standard": "ISO_8601"
  13. },
  14. "fields": [
  15. {
  16. "type": "base",
  17. "name": "name",
  18. "format_info": {
  19. "type": "string"
  20. }
  21. },
  22. {
  23. "type": "base",
  24. "name": "age",
  25. "format_info": {
  26. "type": "int"
  27. }
  28. }
  29. ],
  30. "authentication": null
  31. },
  32. "sink_info": {
  33. "type": "kafka",
  34. "fields": [
  35. {
  36. "type": "base",
  37. "name": "name",
  38. "format_info": {
  39. "type": "string"
  40. }
  41. },
  42. {
  43. "type": "base",
  44. "name": "age",
  45. "format_info": {
  46. "type": "int"
  47. }
  48. }
  49. ],
  50. "address": "YOUR_KAFKA_ADDRESS",
  51. "topic": "sort_test_canal",
  52. "serialization_info": {
  53. "type": "canal"
  54. }
  55. },
  56. "properties": {
  57. "consumer.bootstrap-mode": "earliest",
  58. "transaction.timeout.ms": 900000
  59. }
  60. }

Whole-database migration example:

  1. {
  2. "id": 123,
  3. "source_info": {
  4. "type": "pulsar",
  5. "admin_url": "YOUR_PULSAR_ADMIN_URL",
  6. "service_url": "YOUR_PULSAR_SERVICE_URL",
  7. "topic": "YOUR_PULSAR_TOPIC",
  8. "subscription_name": "whole-db-migration",
  9. "deserialization_info": {
  10. "type": "debezium_json",
  11. "ignore_parse_errors": false,
  12. "timestamp_format_standard": "ISO_8601",
  13. "include_update_before": true
  14. },
  15. "fields": [
  16. {
  17. "type": "builtin",
  18. "name": "db",
  19. "format_info": {
  20. "type": "string"
  21. },
  22. "builtin_field": "MYSQL_METADATA_DATABASE"
  23. },
  24. {
  25. "type": "builtin",
  26. "name": "table",
  27. "format_info": {
  28. "type": "string"
  29. },
  30. "builtin_field": "MYSQL_METADATA_TABLE"
  31. },
  32. {
  33. "type": "builtin",
  34. "name": "mydata",
  35. "format_info": {
  36. "type": "string"
  37. },
  38. "builtin_field": "MYSQL_METADATA_DATA"
  39. },
  40. {
  41. "type": "builtin",
  42. "name": "es",
  43. "format_info": {
  44. "type": "long"
  45. },
  46. "builtin_field": "MYSQL_METADATA_EVENT_TIME"
  47. },
  48. {
  49. "type": "builtin",
  50. "name": "isDdl",
  51. "format_info": {
  52. "type": "boolean"
  53. },
  54. "builtin_field": "MYSQL_METADATA_IS_DDL"
  55. },
  56. {
  57. "type": "builtin",
  58. "name": "type",
  59. "format_info": {
  60. "type": "string"
  61. },
  62. "builtin_field": "MYSQL_METADATA_EVENT_TYPE"
  63. }
  64. ],
  65. "authentication": null
  66. },
  67. "sink_info": {
  68. "type": "kafka",
  69. "fields": [
  70. {
  71. "type": "builtin",
  72. "name": "db",
  73. "format_info": {
  74. "type": "string"
  75. },
  76. "builtin_field": "MYSQL_METADATA_DATABASE"
  77. },
  78. {
  79. "type": "builtin",
  80. "name": "table",
  81. "format_info": {
  82. "type": "string"
  83. },
  84. "builtin_field": "MYSQL_METADATA_TABLE"
  85. },
  86. {
  87. "type": "builtin",
  88. "name": "mydata",
  89. "format_info": {
  90. "type": "string"
  91. },
  92. "builtin_field": "MYSQL_METADATA_DATA"
  93. },
  94. {
  95. "type": "builtin",
  96. "name": "es",
  97. "format_info": {
  98. "type": "long"
  99. },
  100. "builtin_field": "MYSQL_METADATA_EVENT_TIME"
  101. },
  102. {
  103. "type": "builtin",
  104. "name": "isDdl",
  105. "format_info": {
  106. "type": "boolean"
  107. },
  108. "builtin_field": "MYSQL_METADATA_IS_DDL"
  109. },
  110. {
  111. "type": "builtin",
  112. "name": "type",
  113. "format_info": {
  114. "type": "string"
  115. },
  116. "builtin_field": "MYSQL_METADATA_EVENT_TYPE"
  117. }
  118. ],
  119. "address": "YOUR_KAFKA_ADDRESS",
  120. "topic": "whole-db-migration",
  121. "serialization_info": {
  122. "type": "canal"
  123. }
  124. },
  125. "properties": {
  126. "transaction.timeout.ms": 900000,
  127. "consumer.bootstrap-mode": "earliest"
  128. }
  129. }

Pulsar to Hive

Normal example:

  1. {
  2. "id": 123,
  3. "source_info": {
  4. "type": "pulsar",
  5. "admin_url": "http://100.76.43.216:8080",
  6. "service_url": "pulsar://100.76.43.216:6650",
  7. "topic": "persistent://public/public/b_pzr",
  8. "subscription_name": "whole-db-migration",
  9. "deserialization_info": {
  10. "type": "debezium_json",
  11. "ignore_parse_errors": false,
  12. "timestamp_format_standard": "ISO_8601",
  13. "include_update_before": true
  14. },
  15. "fields": [
  16. {
  17. "type": "base",
  18. "name": "f1",
  19. "format_info": {
  20. "type": "string"
  21. }
  22. },
  23. {
  24. "type": "base",
  25. "name": "f2",
  26. "format_info": {
  27. "type": "int"
  28. }
  29. },
  30. {
  31. "type": "builtin",
  32. "name": "data_time",
  33. "format_info": {
  34. "type": "string"
  35. },
  36. "builtin_field": "DATA_TIME"
  37. }
  38. ],
  39. "authentication": null
  40. },
  41. "sink_info": {
  42. "type": "hive",
  43. "fields": [
  44. {
  45. "type": "base",
  46. "name": "f1",
  47. "format_info": {
  48. "type": "string"
  49. }
  50. },
  51. {
  52. "type": "base",
  53. "name": "f2",
  54. "format_info": {
  55. "type": "int"
  56. }
  57. },
  58. {
  59. "type": "builtin",
  60. "name": "data_time",
  61. "format_info": {
  62. "type": "string"
  63. },
  64. "builtin_field": "DATA_TIME"
  65. }
  66. ],
  67. "hive_server_jdbc_url": "YOUR_HIVE_JDBC_URL",
  68. "database": "YOUR_HIVE_DB_NAME",
  69. "table": "YOUR_HIVE_TABLE_NAME",
  70. "username": "username",
  71. "password": "password",
  72. "data_path": "YOUR_HIVE_TABLE_DATA_PATH_ON_HDFS",
  73. "partitions": [
  74. {
  75. "type": "time",
  76. "field_name": "data_time",
  77. "date_format": "yyyy-MM-dd"
  78. },
  79. {
  80. "type": "field",
  81. "field_name": "f2"
  82. }
  83. ],
  84. "file_format": {
  85. "type": "text",
  86. "splitter": "|",
  87. "compression_type": "GZIP"
  88. },
  89. "hadoop_proxy_user": "proxyUser"
  90. },
  91. "properties": {
  92. "transaction.timeout.ms": 900000,
  93. "consumer.bootstrap-mode": "latest"
  94. }
  95. }