命令行工具

总览

除了 InLong Dashboard,你可以通过命令行工具来查看管理 InLong 的相关资源。

命令行工具可以在 InLong 的 bin 目录运行。

用法:

  1. $ bin/inlongctl [options] [command] [command options]

命令:

  • list
  • describe
  • create
  • update
  • delete
  • log

同时也可以使用 --help 或者 -h 获取上述命令的帮助,例如:

  1. $ bin/inlongctl list -h

配置

前往 inlong-manager 目录,修改 conf/application.properties 文件的以下配置即可。

  1. server.host=127.0.0.1
  2. server.port=8080
  3. default.admin.user=admin
  4. default.admin.password=inlong

List

list 用于展示相关资源的核心信息,并以表格的方式展示。

命令:

  • group
  • stream
  • source
  • sink
  • cluster
  • cluster-tag
  • cluster-node
  • user

group

  1. $ bin/inlongctl list group

选项:

参数描述默认值
-g, —groupinlong group id,支持模糊查询
-s, —statusinlong group status ,可选值有:CREATEREJECTEDINITIALIZINGOPERATING
STARTEDFAILEDSTOPPEDFINISHEDDELETED
-n, —num最多显示条数10

group status 说明

group status描述
CREATE待提交、待审批状态
REJECTED审批被驳回
INITIALIZING配置中
OPERATING删除中、停止中以及重启中
STARTED配置成功以及重启成功
FAILED配置失败
STOPPED暂停
FINISHED停止
DELETED已删除

stream

  1. $ bin/inlongctl list stream

选项:

参数描述默认值
-g, —group *inlong group id,该 inlong stream 所在的 inlong group

* 表示为必需参数

source

  1. $ bin/inlongctl list source

选项:

参数描述默认值
-g, —group inlong group id
-s, —stream inlong stream id
-t, —typestream source type,可选值有:AUTO_PUSH, TUBEMQ, PULSAR, KAFKA, FILE, MYSQL_SQL,
MYSQL_BINLOG, POSTGRESQL, ORACLE, SQLSERVER, MONGODB, REDIS

stream source type 说明

stream source type描述
AUTO_PUSH自主推送
TUBEMQTubeMQ
PULSARPulsar
KAFKAKafka
FILE文件
MYSQL_SQLSQL
MYSQL_BINLOGBinlog
POSTGRESQLPostgreSQL
ORACLEOracle
SQLSERVERSQL server
MONGODBMongoDB
REDISRedis

sink

  1. $ bin/inlongctl list sink

选项:

参数描述默认值
-g, —group inlong group id
-s, —stream inlong stream id,该 stream sink 所在的 inlong stream

cluster-tag

  1. $ bin/inlongctl list cluster-tag

选项:

参数描述默认值
—tagcluster tag,集群标签,支持模糊查询

cluster

  1. $ bin/inlongctl list cluster

选项:

参数描述默认值
—tagcluster tag
—typecluster type,可选值有:AGENT, TUBEMQ, PULSAR, DATAPROXY, KAFKA

cluster type 说明

cluster type描述
AGENTAgent
TUBEMQTubeMQ
PULSARPulsar
DATAPROXYDataProxy
KAFKAKafka

cluster-node

  1. $ bin/inlongctl list cluster-node

选项:

参数描述默认值
—tag *cluster tag
—typecluster type,可选值有:AGENT, TUBEMQ, PULSAR, DATAPROXY, KAFKA

user

  1. $ bin/inlongctl list user

选项:

参数描述默认值
-u, —usernameusername,支持模糊查询
—typeuser type,可选值有:ADMIN, OPERATOR

user type 说明

user type描述
ADMIN管理员
OPERATOR普通用户

Describe

describe 用于展示相关资源的详细信息,直接以 Json 格式展示。

命令:

  • group
  • stream
  • source
  • sink
  • cluster
  • cluster-tag
  • cluster-node
  • user

group

  1. $ bin/inlongctl describe group

选项:

参数描述默认值
-g, —groupinlong group id,支持模糊查询
-s, —statusinlong group status ,可选值有:CREATEREJECTEDINITIALIZINGOPERATING
STARTEDFAILEDSTOPPEDFINISHEDDELETED
-n, —num最多显示条数10

group status 说明

group status描述
CREATE待提交、待审批状态
REJECTED审批被驳回
INITIALIZING配置中
OPERATING删除中、停止中以及重启中
STARTED配置成功以及重启成功
FAILED配置失败
STOPPED暂停
FINISHED停止
DELETED已删除

stream

  1. $ bin/inlongctl describe stream

选项:

参数描述默认值
-g, —group *inlong group id,该 inlong stream 所在的 inlong group

source

  1. $ bin/inlongctl describe source

选项:

参数描述默认值
-g, —group inlong group id
-s, —stream inlong stream id
-t, —typestream source type,可选值有:AUTO_PUSH, TUBEMQ, PULSAR, KAFKA, FILE, MYSQL_SQL,
MYSQL_BINLOG, POSTGRESQL, ORACLE, SQLSERVER, MONGODB, REDIS

stream source type 说明

stream source type描述
AUTO_PUSH自主推送
TUBEMQTubeMQ
PULSARPulsar
KAFKAKafka
FILE文件
MYSQL_SQLSQL
MYSQL_BINLOGBinlog
POSTGRESQLPostgreSQL
ORACLEOracle
SQLSERVERSQL server
MONGODBMongoDB
REDISRedis

sink

  1. $ bin/inlongctl describe sink

选项:

参数描述默认值
-g, —group inlong group id
-s, —stream inlong stream id,该 stream sink 所在的 inlong stream

cluster-tag

  1. $ bin/inlongctl describe cluster-tag

选项:

参数描述默认值
-id, —id *cluster tag id

cluster

  1. $ bin/inlongctl describe cluster

选项:

参数描述默认值
-id, —id *cluster id

cluster-node

  1. $ bin/inlongctl describe cluster-node

选项:

参数描述默认值
-id, —id *cluster node id

user

  1. $ bin/inlongctl describe user

选项:

参数描述默认值
-id, —id *user id

Create

create 用于创建相关资源,目前通过使用 json 文件的方式创建

命令:

  • group
  • cluster
  • cluster-tag
  • cluster-node
  • user

group

  1. $ bin/inlongctl create group

选项:

参数描述默认值
-f, —filejson 文件名称

json:

  1. {
  2. "groupInfo": {
  3. "inlongGroupId": "test_group_ctl",
  4. "inlongClusterTag": "default_cluster",
  5. "mqType": "PULSAR"
  6. },
  7. "streamInfo": {
  8. "inlongStreamId": "test_stream_ctl",
  9. "fieldList": [
  10. {
  11. "fieldName": "name",
  12. "fieldType": "string"
  13. }
  14. ],
  15. "sourceList": [
  16. {
  17. "sourceType": "FILE",
  18. "sourceName": "test_source_ctl",
  19. "agentIp": "127.0.0.1",
  20. "pattern": "/data/test.txt"
  21. }
  22. ],
  23. "sinkList": [
  24. {
  25. "sinkType": "CLICKHOUSE",
  26. "sinkName": "test_sink_ctl",
  27. "dataNodeName": "test_clickhouse",
  28. "dbName": "db_test",
  29. "tableName": "table_test",
  30. "flushInterval": 1,
  31. "flushRecord": 1000,
  32. "retryTimes": 3,
  33. "engine": "Log",
  34. "isDistributed": 1,
  35. "sinkFieldList": [
  36. {
  37. "sourceFieldName": "name",
  38. "sourceFieldType": "string",
  39. "fieldName": "name",
  40. "fieldType": "string"
  41. }
  42. ]
  43. }
  44. ]
  45. }
  46. }
  • 这是一个 filepulsarclickhouse 流向的示例,如果需要其它流向,只需要替换相应 sourcesink 部分

Source:

  • File
  • PostgreSQL
  • MySQL
  • SQLServer
  • MongoDB
  • Redis
  • Oracle
  • MQTT
  1. {
  2. "sourceType": "FILE",
  3. "sourceName": "test_source_ctl",
  4. "agentIp": "127.0.0.1",
  5. "pattern": "/data/test.txt"
  6. }
  1. {
  2. "sourceType": "MYSQL_BINLOG",
  3. "sourceName": "test_source_ctl",
  4. "agentIp": "127.0.0.1",
  5. "user": "username",
  6. "password": "password",
  7. "hostname": "127.0.0.1",
  8. "port": 3306,
  9. "tableWhiteList": "db_test.table_test",
  10. "serverTimezone": "UTC",
  11. "intervalMs": "1000",
  12. "historyFilename": "/data/inlong-agent/.history",
  13. "allMigration": false
  14. }
  1. {
  2. "sourceType": "MONGODB",
  3. "sourceName": "test_source_ctl",
  4. "agentIp": "127.0.0.1",
  5. "hosts": "127.0.0.1:27017",
  6. "username": "username",
  7. "password": "password",
  8. "database": "database_test",
  9. "collection": "collection_test",
  10. "primaryKey": "_id"
  11. }
  1. {
  2. "sourceType": "SQLSERVER",
  3. "sourceName": "test_source_ctl",
  4. "agentIp": "127.0.0.1",
  5. "username": "username",
  6. "password": "password",
  7. "hostname": "127.0.0.1",
  8. "port": 1433,
  9. "database": "database_test",
  10. "schemaName": "schema_test",
  11. "tableName": "table_test",
  12. "serverTimezone": "UTC",
  13. "allMigration": false,
  14. "primaryKey": "id"
  15. }
  1. {
  2. "sourceType": "REDIS",
  3. "sourceName": "test_source_ctl",
  4. "agentIp": "127.0.0.1",
  5. "host": "127.0.0.1",
  6. "port": 6379,
  7. "username": "username",
  8. "password": "password",
  9. "database": 0,
  10. "redisMode": "cluster",
  11. "timeout": 1000,
  12. "soTimeout": 1000
  13. }
  1. {
  2. "sourceType": "POSTGRESQL",
  3. "sourceName": "test_source_ctl",
  4. "agentIp": "127.0.0.1",
  5. "username": "username",
  6. "password": "password",
  7. "hostname": "127.0.0.1",
  8. "port": 5432,
  9. "database": "database_test",
  10. "schema": "public",
  11. "decodingPluginName": "pgoutput",
  12. "tableNameList": [
  13. "table_test"
  14. ],
  15. "primaryKey": "id"
  16. }
  1. {
  2. "sourceType": "ORACLE",
  3. "sourceName": "test_source_ctl",
  4. "agentIp": "127.0.0.1",
  5. "hostname": "127.0.0.1",
  6. "port": 1521,
  7. "username": "username",
  8. "password": "password",
  9. "database": "database_test",
  10. "schemaName": "schema_test",
  11. "tableName": "table_test",
  12. "scanStartupMode": "initial",
  13. "allMigration": false
  14. }
  1. {
  2. "sourceType": "MQTT",
  3. "sourceName": "test_source_ctl",
  4. "agentIp": "127.0.0.1",
  5. "serverURI": "tcp://127.0.0.1:1883",
  6. "username": "username",
  7. "password": "password",
  8. "topic": "topic_test",
  9. "qos": 1,
  10. "clientId": "client_test",
  11. "mqttVersion": "4"
  12. }

Sink:

  • Clickhouse
  • Hive
  • Elasticsearch
  • Kafka
  • MySQL
  • Oracle
  • PostgreSQL
  • SQLServer
  • Iceberg
  1. {
  2. "sinkType": "CLICKHOUSE",
  3. "sinkName": "test_sink_ctl",
  4. "dataNodeName": "test_clickhouse",
  5. "dbName": "db_test",
  6. "tableName": "table_test",
  7. "flushInterval": 1,
  8. "flushRecord": 1000,
  9. "retryTimes": 3,
  10. "engine": "Log",
  11. "isDistributed": 1,
  12. "enableCreateResource": 1,
  13. "sinkFieldList": []
  14. }
  1. {
  2. "sinkType": "HIVE",
  3. "sinkName": "test_sink_ctl",
  4. "dataNodeName": "hive_test",
  5. "dbName": "db_test",
  6. "tableName": "table_test",
  7. "enableCreateResource": 1,
  8. "dataEncoding": "UTF-8",
  9. "fileFormat": "TextFile",
  10. "dataSeparator": "124",
  11. "enableCreateResource": 1,
  12. "sinkFieldList": []
  13. }
  1. {
  2. "sinkType": "ELASTICSEARCH",
  3. "sinkName": "test_sink_ctl",
  4. "dataNodeName": "test_es",
  5. "indexName": "index_test",
  6. "documentType": "doc_test",
  7. "flushRecord": 1000,
  8. "primaryKey": "_id",
  9. "esVersion": 5,
  10. "enableCreateResource": 1,
  11. "sinkFieldList": []
  12. }
  1. {
  2. "sinkType": "KAFKA",
  3. "sinkName": "test_sink_ctl",
  4. "bootstrapServers": "127.0.0.1:9092",
  5. "topicName": "topic_test",
  6. "partitionNum": 3,
  7. "serializationType": "JSON",
  8. "autoOffsetReset": "earliest",
  9. "enableCreateResource": 1,
  10. "sinkFieldList": []
  11. }
  1. {
  2. "sinkType": "MYSQL",
  3. "sinkName": "test_sink_ctl",
  4. "dataNodeName": "test_mysql",
  5. "databaseName": "database_test",
  6. "tableName": "table_test",
  7. "primaryKey": "id",
  8. "enableCreateResource": 1,
  9. "sinkFieldList": []
  10. }
  1. {
  2. "sinkType": "ORACLE",
  3. "sinkName": "test_sink_ctl",
  4. "jdbcUrl": "jdbc:oracle:thin://127.0.0.1:1521/db_name",
  5. "username": "username",
  6. "password": "password",
  7. "tableName": "test_table",
  8. "primaryKey": "id",
  9. "enableCreateResource": 1,
  10. "sinkFieldList": []
  11. }
  1. {
  2. "sinkType": "POSTGRESQL",
  3. "sinkName": "test_sink_ctl",
  4. "jdbcUrl": "jdbc:postgresql://127.0.0.1:5432/db_name",
  5. "username": "username",
  6. "password": "password",
  7. "dbName": "db_test",
  8. "tableName": "table_test",
  9. "primaryKey": "id",
  10. "enableCreateResource": 1,
  11. "sinkFieldList": []
  12. }
  1. {
  2. "sinkType": "SQLSERVER",
  3. "sinkName": "test_sink_ctl",
  4. "jdbcUrl": "jdbc:sqlserver://127.0.0.1:1433;database=db_test",
  5. "tableName": "table_test",
  6. "schemaName": "schema_test",
  7. "username": "username",
  8. "password": "password",
  9. "serverTimezone": "UTC",
  10. "allMigration": false,
  11. "primaryKey": "id",
  12. "enableCreateResource": 1,
  13. "sinkFieldList": []
  14. }
  1. {
  2. "sinkType": "ICEBERG",
  3. "sinkName": "test_sink_ctl",
  4. "dataNodeName": "test_iceberg",
  5. "dbName": "db_test",
  6. "tableName": "table_test",
  7. "fileFormat": "Parquet",
  8. "primaryKey": "id",
  9. "enableCreateResource": 1,
  10. "sinkFieldList": []
  11. }

cluster

  1. $ bin/inlongctl create cluster

选项:

参数描述默认值
-f, —filejson 文件名称

json 示例:

  1. {
  2. "name": "test_cluster",
  3. "url": "127.0.0.1:8080",
  4. "clusterTags": "test_cluster_tag",
  5. "extTag": null,
  6. "description": null,
  7. "inCharges": "admin",
  8. "type": "PULSAR",
  9. "adminUrl": "http://127.0.0.1:8080",
  10. "tenant": "public"
  11. }

cluster-tag

  1. $ bin/inlongctl create cluster-tag

选项:

参数描述默认值
-f, —filejson 文件名称

json 示例:

  1. {
  2. "clusterTag": "test_cluster_tag",
  3. "inCharges": "ctl",
  4. "extParams": null,
  5. "description": null
  6. }

cluster-node

  1. $ bin/inlongctl create cluster-node

选项:

参数描述默认值
-f, —filejson 文件名称

json 示例:

  1. {
  2. "parentId": 1,
  3. "type": "AGENT",
  4. "ip": "127.0.0.1",
  5. "port": 8008,
  6. "extParams": null,
  7. "description": "null"
  8. }

parentId 为该节点对应 cluster 的 id,可通过 list clusterdescribe cluster 查看

user

  1. $ bin/inlongctl create user

选项:

参数描述默认值
-u, —username用户名称
-p, —password用户密码
-t, —type用户类型
-d, —day有效期

Update

update 用于修改相关资源,目前通过使用 json 文件的方式修改

命令:

  • cluster
  • cluster-tag
  • cluster-node
  • user

update 所需的 json 文件可以在 describe 得到的 json 上进行需要的修改即可。

cluster

  1. $ bin/inlongctl update cluster

选项:

参数描述默认值
-f, —filejson 文件名称

cluster-tag

  1. $ bin/inlongctl update cluster-tag

选项:

参数描述默认值
-f, —filejson 文件名称

cluster-node

  1. $ bin/inlongctl update cluster-node

选项:

参数描述默认值
-f, —filejson 文件名称

user

  1. $ bin/inlongctl update user

选项:

参数描述默认值
-u, —username用户名称
-p, —password新密码
-d, —day新有效期

Suspend

suspend 用于暂停 inlong 任务

命令:

  • group

group

  1. $ bin/inlongctl suspend group

选项:

参数描述默认值
-g, —groupinlong group id

Restart

restart 用于重启 inlong 任务

命令:

  • group

group

  1. $ bin/inlongctl restart group

选项:

参数描述默认值
-g, —groupinlong group id

Delete

delete 用于删除相关资源。

命令:

  • group
  • stream
  • source
  • sink
  • cluster
  • cluster-tag
  • cluster-node
  • user

group

  1. $ bin/inlongctl delete group

选项:

参数描述默认值
-g, —groupinlong group id,支持模糊查询
-s, —statusinlong group status ,可选值有:CREATEREJECTEDINITIALIZINGOPERATING
STARTEDFAILEDSTOPPEDFINISHEDDELETED
-n, —num最多显示条数10

group status 说明

group status描述
CREATE待提交、待审批状态
REJECTED审批被驳回
INITIALIZING配置中
OPERATING删除中、停止中以及重启中
STARTED配置成功以及重启成功
FAILED配置失败
STOPPED暂停
FINISHED停止
DELETED已删除

stream

  1. $ bin/inlongctl delete stream

选项:

参数描述默认值
-g, —group *inlong group id,该 inlong stream 所在的 inlong group

source

  1. $ bin/inlongctl delete source

选项:

参数描述默认值
-g, —group inlong group id
-s, —stream inlong stream id
-t, —typestream source type,可选值有:AUTO_PUSH, TUBEMQ, PULSAR, KAFKA, FILE, MYSQL_SQL,
MYSQL_BINLOG, POSTGRESQL, ORACLE, SQLSERVER, MONGODB, REDIS

stream source type 说明

stream source type描述
AUTO_PUSH自主推送
TUBEMQTubeMQ
PULSARPulsar
KAFKAKafka
FILE文件
MYSQL_SQLSQL
MYSQL_BINLOGBinlog
POSTGRESQLPostgreSQL
ORACLEOracle
SQLSERVERSQL server
MONGODBMongoDB
REDISRedis

sink

  1. $ bin/inlongctl delete sink

选项:

参数描述默认值
-g, —group inlong group id
-s, —stream inlong stream id,该 stream sink 所在的 inlong stream

cluster-tag

  1. $ bin/inlongctl delete cluster-tag

选项:

参数描述默认值
-id, —id *cluster tag id

cluster

  1. $ bin/inlongctl delete cluster

选项:

参数描述默认值
-id, —id *cluster id

cluster-node

  1. $ bin/inlongctl delete cluster-node

选项:

参数描述默认值
-id, —id *cluster node id

user

  1. $ bin/inlongctl delete user

选项:

参数描述默认值
-id, —id *user id

Log

创建任务流程之后,可以使用 log 命令查看任务各阶段的执行日志

命令:

  • group

group

  1. $ bin/inlongctl log group

选项:

参数描述默认值
-g, —groupinlong group id,不支持模糊查询