EXPLAIN 语句

EXPLAIN 语句用于解释 query 或 INSERT 语句的执行逻辑,也用于优化 query 语句的查询计划。

执行 EXPLAIN 语句

Java

可以使用 TableEnvironmentexecuteSql() 方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql() 方法会返回解释结果,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 EXPLAIN 语句。

Scala

可以使用 TableEnvironmentexecuteSql() 方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,executeSql() 方法会返回解释结果,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 EXPLAIN 语句。

Python

可以使用 TableEnvironmentexecute_sql() 方法执行 EXPLAIN 语句。如果 EXPLAIN 操作执行成功,execute_sql() 方法会返回解释结果,否则会抛出异常。

以下示例展示了如何在 TableEnvironment 中执行一条 EXPLAIN 语句。

SQL CLI

EXPLAIN 语句可以在 SQL CLI 中执行。

以下示例展示了如何在 SQL CLI 中执行一条 EXPLAIN 语句。

Java

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  3. // 注册名为 “Orders” 的表
  4. tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
  5. tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')");
  6. // 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
  7. String explanation = tEnv.explainSql(
  8. "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
  9. "UNION ALL " +
  10. "SELECT `count`, word FROM MyTable2");
  11. System.out.println(explanation);
  12. // 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
  13. TableResult tableResult = tEnv.executeSql(
  14. "EXPLAIN PLAN FOR " +
  15. "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
  16. "UNION ALL " +
  17. "SELECT `count`, word FROM MyTable2");
  18. tableResult.print();
  19. TableResult tableResult2 = tEnv.executeSql(
  20. "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " +
  21. "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
  22. "UNION ALL " +
  23. "SELECT `count`, word FROM MyTable2");
  24. tableResult2.print();

Scala

  1. val env = StreamExecutionEnvironment.getExecutionEnvironment()
  2. val tEnv = StreamTableEnvironment.create(env)
  3. // 注册名为 “Orders” 的表
  4. tEnv.executeSql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  5. tEnv.executeSql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  6. // 调用 TableEnvironment.explainSql() 来解释 SELECT 语句
  7. val explanation = tEnv.explainSql(
  8. "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
  9. "UNION ALL " +
  10. "SELECT `count`, word FROM MyTable2")
  11. println(explanation)
  12. // 调用 TableEnvironment.executeSql() 来解释 SELECT 语句
  13. val tableResult = tEnv.executeSql(
  14. "EXPLAIN PLAN FOR " +
  15. "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
  16. "UNION ALL " +
  17. "SELECT `count`, word FROM MyTable2")
  18. tableResult.print()
  19. val tableResult2 = tEnv.executeSql(
  20. "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN " +
  21. "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' " +
  22. "UNION ALL " +
  23. "SELECT `count`, word FROM MyTable2")
  24. tableResult2.print()

Python

  1. settings = EnvironmentSettings.new_instance()...
  2. table_env = StreamTableEnvironment.create(env, settings)
  3. t_env.execute_sql("CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  4. t_env.execute_sql("CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen')")
  5. # 调用 TableEnvironment.explain_sql() 来解释 SELECT 语句
  6. explanation1 = t_env.explain_sql(
  7. "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' "
  8. "UNION ALL "
  9. "SELECT `count`, word FROM MyTable2")
  10. print(explanation1)
  11. # 调用 TableEnvironment.execute_sql() 来解释 SELECT 语句
  12. table_result = t_env.execute_sql(
  13. "EXPLAIN PLAN FOR "
  14. "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' "
  15. "UNION ALL "
  16. "SELECT `count`, word FROM MyTable2")
  17. table_result.print()
  18. table_result2 = t_env.execute_sql(
  19. "EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN "
  20. "SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%' "
  21. "UNION ALL "
  22. "SELECT `count`, word FROM MyTable2")
  23. table_result2.print()

SQL CLI

  1. Flink SQL> CREATE TABLE MyTable1 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
  2. [INFO] Table has been created.
  3. Flink SQL> CREATE TABLE MyTable2 (`count` bigint, word VARCHAR(256)) WITH ('connector' = 'datagen');
  4. [INFO] Table has been created.
  5. Flink SQL> EXPLAIN PLAN FOR SELECT `count`, word FROM MyTable1 WHERE word LIKE 'F%'
  6. > UNION ALL
  7. > SELECT `count`, word FROM MyTable2;
  8. Flink SQL> EXPLAIN ESTIMATED_COST, CHANGELOG_MODE, JSON_EXECUTION_PLAN SELECT `count`, word FROM MyTable1
  9. > WHERE word LIKE 'F%'
  10. > UNION ALL
  11. > SELECT `count`, word FROM MyTable2;

EXPLAIN 的结果如下:

EXPLAIN PLAN

  1. == Abstract Syntax Tree ==
  2. LogicalUnion(all=[true])
  3. :- LogicalProject(count=[$0], word=[$1])
  4. : +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
  5. : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
  6. +- LogicalProject(count=[$0], word=[$1])
  7. +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
  8. == Optimized Physical Plan ==
  9. Union(all=[true], union=[count, word])
  10. :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
  11. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  12. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
  13. == Optimized Execution Plan ==
  14. Union(all=[true], union=[count, word])
  15. :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
  16. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  17. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])

EXPLAIN PLAN WITH DETAILS

  1. == Abstract Syntax Tree ==
  2. LogicalUnion(all=[true])
  3. :- LogicalProject(count=[$0], word=[$1])
  4. : +- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
  5. : +- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
  6. +- LogicalProject(count=[$0], word=[$1])
  7. +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
  8. == Optimized Physical Plan ==
  9. Union(all=[true], union=[count, word], changelogMode=[I]): rowcount = 1.05E8, cumulative cost = {3.1E8 rows, 3.05E8 cpu, 4.0E9 io, 0.0 network, 0.0 memory}
  10. :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], changelogMode=[I]): rowcount = 5000000.0, cumulative cost = {1.05E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
  11. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
  12. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word], changelogMode=[I]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 2.0E9 io, 0.0 network, 0.0 memory}
  13. == Optimized Execution Plan ==
  14. Union(all=[true], union=[count, word])
  15. :- Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
  16. : +- TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])
  17. +- TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])
  18. == Physical Execution Plan ==
  19. {
  20. "nodes" : [ {
  21. "id" : 37,
  22. "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])",
  23. "pact" : "Data Source",
  24. "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word])",
  25. "parallelism" : 1
  26. }, {
  27. "id" : 38,
  28. "type" : "Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])",
  29. "pact" : "Operator",
  30. "contents" : "Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])",
  31. "parallelism" : 1,
  32. "predecessors" : [ {
  33. "id" : 37,
  34. "ship_strategy" : "FORWARD",
  35. "side" : "second"
  36. } ]
  37. }, {
  38. "id" : 39,
  39. "type" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])",
  40. "pact" : "Data Source",
  41. "contents" : "Source: TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word])",
  42. "parallelism" : 1
  43. } ]

ExplainDetails

  1. 使用指定的 explainDetail 类型来打印语句的计划。
  2. ESTIMATED_COST:生成优化器(optimizer)估算的物理节点相关的成本信息,
  3. 例如:TableSourceScan(..., cumulative cost ={1.0E8 rows, 1.0E8 cpu, 2.4E9 io, 0.0 network, 0.0 memory})
  4. CHANGELOG_MODE:为每个物理 RelNode 生成 changelog mode
  5. 例如:GroupAggregate(..., changelogMode=[I,UA,D])
  6. JSON_EXECUTION_PLAN:生成 json 格式的程序执行计划。

语法

  1. EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement_or_statement_set>
  2. statement_set:
  3. EXECUTE STATEMENT SET
  4. BEGIN
  5. insert_statement;
  6. ...
  7. insert_statement;
  8. END;

关于 query 的语法,请查阅 Queries 页面。 关于 INSERT 的语法,请查阅 INSERT 页面。