命令行工具

总览

除了 InLong Dashboard,你可以通过命令行工具来查看和创建数据 Group 和 Stream。

  1. Usage: managerctl [options] [command] [command options]
  2. Options:
  3. -h, --help
  4. Get all command about managerctl.
  5. Commands:
  6. create Create resource by json file
  7. Usage: create [options]
  8. describe Display details of one or more resources
  9. Usage: describe [options]
  10. list Displays main information for one or more resources
  11. Usage: list [options]
  12. delete Deletes the inlong group corresponding to the given id
  13. Usage: managerctl delete [command]
  14. update Updates the inlong group corresponding to the given id
  15. Usage: managerctl update [command] [command options]
  16. log Filters out inlong groups according to its properties
  17. Usage: managerctl log [command] [command options]

目前命令行工具支持 listdescribecreatelogupdatedelete 六个命令。

配置

前往 inlong-manager 目录,修改 conf/application.properties 文件的以下配置即可。

  1. server.host=127.0.0.1
  2. server.port=8080
  3. default.admin.user=admin
  4. default.admin.password=inlong

List

  1. Usage: managerctl list [command] [command options]
  2. Commands:
  3. stream Get stream main information
  4. Usage: stream [options]
  5. Options:
  6. * -g, --group
  7. inlong group id
  8. group Get group details
  9. Usage: group [options]
  10. Options:
  11. -g, --group
  12. inlong group id
  13. -n, --num
  14. the number displayed
  15. Default: 10
  16. -s, --status
  17. ( CREATE | REJECTED | INITIALIZING | OPERATING |
  18. STARTED | FAILED | STOPPED | FINISHED | DELETED )
  19. sink Get sink details
  20. Usage: sink [options]
  21. Options:
  22. * -g, --group
  23. group id
  24. * -s, --stream
  25. stream id
  26. source Get source details
  27. Usage: source [options]
  28. Options:
  29. * -g, --group
  30. inlong group id
  31. * -s, --stream
  32. inlong stream id
  33. -t, --type
  34. sink type

* 号为必选项

list 用于展示inlong group / stream / sink / source 的核心信息。

Describe

  1. Usage: managerctl describe [command] [command options]
  2. Commands:
  3. stream Get stream details
  4. Usage: stream [options]
  5. Options:
  6. * -g, --group
  7. inlong group id
  8. group Get group details
  9. Usage: group [options]
  10. Options:
  11. -g, --group
  12. inlong group id
  13. -n, --num
  14. the number displayed
  15. Default: 10
  16. -s, --status
  17. Default: 0
  18. sink Get sink details
  19. Usage: sink [options]
  20. Options:
  21. * -g, --group
  22. inlong group id
  23. * -s, --stream
  24. inlong stream id
  25. source Get source details
  26. Usage: source [options]
  27. Options:
  28. * -g, --group
  29. inlong group id
  30. * -s, --stream
  31. inlong stream id
  32. * -t, --type
  33. sink type

describe 用于展示inlong group / stream / sink / source 的详细信息,并以Json格式输出。

Create

  1. Usage: managerctl create [command] [command options]
  2. Commands:
  3. group Create group by json file
  4. Usage: group [options]
  5. Options:
  6. * -f, --file
  7. json file

create 不需要申请审核等步骤,只需将所配置信息准备在Json文件中即可。

Json文件

Json 文件主要有五个部分: groupConfstreamConfstreamSourcestreamSink 以及 streamFieldList

  1. {
  2. "groupConf": {
  3. "groupName": "test_group",
  4. "description": "",
  5. "proxyClusterId": "1",
  6. "mqBaseConf": {
  7. "type": "PULSAR",
  8. "pulsarServiceUrl": "pulsar://127.0.0.1:6650",
  9. "pulsarAdminUrl": "http://127.0.0.1:8080",
  10. "tenant": "tenant",
  11. "namespace": "namespace",
  12. "enableCreateResource": false
  13. },
  14. "sortBaseConf": {
  15. "type": "FLINK",
  16. "serviceUrl": "127.0.0.1:8081"
  17. },
  18. "zookeeperEnabled": false,
  19. "dailyRecords": 10000000,
  20. "peakRecords": 100000,
  21. "maxLength": 10000
  22. },
  23. "streamConf": {
  24. "name": "test_stream",
  25. "description": "",
  26. "dataSeparator": "|",
  27. "strictlyOrdered": true,
  28. "topic": "topic"
  29. },
  30. "streamSource": {
  31. "sourceType": "KAFKA",
  32. "bootstrapServers": "127.0.0.1:9092",
  33. "topic": "kafka_topic",
  34. "sourceName": "kafka_sourceName",
  35. "dataFormat": "json",
  36. "autoOffsetReset": "EARLIEST"
  37. },
  38. "streamSink": {
  39. "sinkType": "HIVE",
  40. "dbName": "test_db",
  41. "jdbcUrl": "jdbc:hive2://127.0.0.1:10000",
  42. "authentication": {
  43. "userName": "hive",
  44. "password": "hive"
  45. },
  46. "fileFormat": "TextFile",
  47. "dataSeparator": "|",
  48. "dataPath": "hdfs://127.0.0.1:9000/user/hive/warehouse/test_db",
  49. "sinkFields": [
  50. {
  51. "id": 0,
  52. "fieldType": "STRING",
  53. "fieldName": "name",
  54. "sourceFieldType": "STRING",
  55. "sourceFieldName": "name"
  56. }
  57. ],
  58. "tableName": "test_table",
  59. "sinkName": "test",
  60. "dataFormat": "json"
  61. },
  62. "streamFieldList": [
  63. {
  64. "id": 0,
  65. "fieldType": "STRING",
  66. "fieldName": "name",
  67. "fieldComment": null,
  68. "fieldValue": null
  69. }
  70. ]
  71. }

streamSource

  • Kafka

    1. "streamSource": {
    2. "sourceType": "KAFKA",
    3. "sourceName": "sourceName",
    4. "bootstrapServers": "127.0.0.1:9092",
    5. "topic": "kafka_topic",
    6. "dataFormat": "json",
    7. "autoOffsetReset": "EARLIEST"
    8. },
  • MySQL Binlog

    1. "mqBaseConf": {
    2. "type": "BINLOG",
    3. "sourceName": "sourceName",
    4. "hostname": "127.0.0.1",
    5. "port" : "3306",
    6. "authentication": {
    7. "userName": "root",
    8. "password": "root"
    9. },
    10. "includeSchema": false,
    11. "serverTimezone": "UTC",
    12. "monitoredDdl": false,
    13. "allMigration": false,
    14. "dbNames": ["db1", "test_db*"],
    15. "tableNames": ["tb1", "user"*],
    16. }
  • File

    1. "mqBaseConf": {
    2. "type": "FILE",
    3. "sourceName": "sourceName",
    4. "ip": "127.0.0.1",
    5. "pattern" : "/a/b/*.txt",
    6. "timeOffset": "-1d"
    7. }

streamSink

  • Hive

    1. "streamSink": {
    2. "sinkType": "HIVE",
    3. "dbName": "test_db",
    4. "jdbcUrl": "jdbc:hive2://127.0.0.1:10000",
    5. "authentication": {
    6. "userName": "hive",
    7. "password": "hive"
    8. },
  • Kafka

    1. "mqBaseConf": {
    2. "type": "KAFKA",
    3. "bootstrapServers": "127.0.0.1:9092",
    4. "topicName": "test_topic",
    5. "dataFormat": "JSON",
    6. "boolean": false,
    7. }
  • ClickHouse

    1. "mqBaseConf": {
    2. "type": "CLICKHOUSE",
    3. "sinkName": "sinkName",
    4. "dbName": "db_test",
    5. "tableName": "table_test",
    6. "jdbcUrl": "jdbc:clickhouse://127.0.0.1:8123/db",
    7. "authentication": {
    8. "userName": "default",
    9. "password": "default"
    10. },
    11. "isDistributed": 1,
    12. "flushInterval": 1,
    13. "flushRecord": 10,
    14. "keyFieldNames": "keyField",
    15. "partitionFields": "partition",
    16. "partitionStrategy": "BALANCE",
    17. "retryTimes": 3,
    18. "needCreated": false
    19. }

Delete

  1. Usage: managerctl delete [command]
  2. Commands:
  3. group The id of the inlong group that is to be deleted
  4. Usage: group

delete 删除对应任务id的任务实例

Update

  1. Usage: managerctl update [command] [command options]
  2. Commands:
  3. group The id of the inlong group that is to be updated
  4. Usage: group [options]
  5. Options:
  6. *-c --config
  7. the config file as json

update 命令指定一个任务实例,并使用—config中指定的sortconf json文件对其进行更新

  • Sortconf json 示例

    1. "FlinkSortConf": {
    2. "sortType": "flink",
    3. "authentication": "NONE",
    4. "serviceUrl": "127.0.0.1:8123",
    5. "region": "beijing",
    6. "properties": {}
    7. }

Log

  1. Usage: managerctl log [command] [command options]
  2. Commands:
  3. group Get group details
  4. Usage: group [options]
  5. Options:
  6. *-q --query [parameter:value]
  7. select the list of groups accourding to one selected query.
  8. Supported filters:
  9. inlongGroupId
  10. name (Inlong group name)
  11. mqType
  12. mqResource
  13. inlongClusterTag
  14. inCharges
  15. status
  16. creator
  17. modifier
  18. createTime
  19. modifyTime

log 命令行使用时需选择过滤参数,当存在的任务数不多时,该命令将打印出所有满足参数的任务实例