分布式连接是大数据量场景中很重要的一个查询优化执行手段,当两个或多个表的数据量比较大的时候,在执行两表或者多表的连接的时候,需要尽量的将 JOIN 通过并行执行的方式以提高执行效率,减小查询的响应时间。

本节主要介绍 OceanBase 数据库支持的分布式连接的几种典型场景。

PARTITION-WISE JOIN 场景

PARTITION-WISE JOIN 是指两表的连接条件包含了两表的分区键,并且两表的分区方式是一样的。

在如下例所示,t2 和 t3 都是在 c1 这个列上进行了 key 分区,分区总数为 4 个分区,分区模式完全相同。当查询的条件为 t2.c1=t3.c1 and t2.c2=t3.c2 时,查询条件完全包含了分区键,查询可以以分区为单位在每个分区内进行。如果并行度为 4 的话,该查询可以同时做 4 个分区的JOIN 并且将最后结果输出。

  1. create table t2 (c1 int, c2 int) partition by key(c1) partitions 4;
  2. create table t3 (c1 int, c2 int) partition by key(c1) partitions 4;
  1. explain select * from t2, t3 where t2.c1 = t3.c1 and t2.c2=t3.c2\G
  2. *************************** 1. row ***************************
  3. Query Plan:
  4. ============================================
  5. |ID|OPERATOR |NAME|EST. ROWS|COST|
  6. --------------------------------------------
  7. |0 |EXCHANGE IN DISTR | |63 |8374|
  8. |1 | EXCHANGE OUT DISTR| |63 |8362|
  9. |2 | HASH JOIN | |63 |8362|
  10. |3 | TABLE SCAN |t2 |4000 |499 |
  11. |4 | TABLE SCAN |t3 |4000 |499 |
  12. ============================================
  13. Outputs & filters:
  14. -------------------------------------
  15. 0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
  16. 1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
  17. 2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
  18. equal_conds([t2.c1 = t3.c1], [t2.c2 = t3.c2]), other_conds(nil)
  19. 3 - output([t2.c1], [t2.c2]), filter(nil),
  20. access([t2.c1], [t2.c2]), partitions(p[0-3])
  21. 4 - output([t3.c1], [t3.c2]), filter(nil),
  22. access([t3.c1], [t3.c2]), partitions(p[0-3])

以一边分区表的分区模式重新分布另一个表的场景

同样以 t2 和 t3 表为例,如下例所示,查询 JOIN 条件覆盖了 t2 的分区键而没有覆盖 t3 的分区键,所以执行计划将会按照 t2 的分区模式,将 t3 的数据进行以 partition key 为目标的分组。在将 t3 的数据依照 c2 的值和 t2 的 c1 列的分区方式打散重分布后,可以以 t2 的分区为单位进行分区间的并行连接。

  1. explain select * from t2, t3 where t2.c1 = t3.c2\G
  2. *************************** 1. row ***************************
  3. Query Plan:
  4. =======================================================
  5. |ID|OPERATOR |NAME|EST. ROWS|COST |
  6. -------------------------------------------------------
  7. |0 |EXCHANGE IN DISTR | |31680 |35454|
  8. |1 | EXCHANGE OUT DISTR | |31680 |29456|
  9. |2 | HASH JOIN | |31680 |29456|
  10. |3 | TABLE SCAN |t2 |4000 |499 |
  11. |4 | EXCHANGE IN DISTR | |4000 |878 |
  12. |5 | EXCHANGE OUT DISTR (PKEY)| |4000 |499 |
  13. |6 | TABLE SCAN |t3 |4000 |499 |
  14. =======================================================
  15. Outputs & filters:
  16. -------------------------------------
  17. 0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
  18. 1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
  19. 2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
  20. equal_conds([t2.c1 = t3.c2]), other_conds(nil)
  21. 3 - output([t2.c1], [t2.c2]), filter(nil),
  22. access([t2.c1], [t2.c2]), partitions(p[0-3])
  23. 4 - output([t3.c1], [t3.c2]), filter(nil)
  24. 5 - (#keys=1, [t3.c2]), output([t3.c1], [t3.c2]), filter(nil)
  25. 6 - output([t3.c1], [t3.c2]), filter(nil),
  26. access([t3.c1], [t3.c2]), partitions(p[0-3])

两个表都重新分布的场景

对于同样的 t2 和 t3 表,如果连接条件既没有包含 t2 的分区键也没有包含 t3 的分区键,会生成如下例所示的计划。注意这个计划的 JOIN 部分是在主线程完成。在后续的 OceanBase 数据库版本中,这样的计划的 JOIN 部分也会在工作线程以并行的方式执行。

  1. explain select * from t2, t3 where t2.c2 = t3.c2\G
  2. *************************** 1. row ***************************
  3. Query Plan:
  4. ==============================================
  5. |ID|OPERATOR |NAME|EST. ROWS|COST |
  6. ----------------------------------------------
  7. |0 |HASH JOIN | |31680 |29835|
  8. |1 | EXCHANGE IN DISTR | |4000 |878 |
  9. |2 | EXCHANGE OUT DISTR| |4000 |499 |
  10. |3 | TABLE SCAN |t2 |4000 |499 |
  11. |4 | EXCHANGE IN DISTR | |4000 |878 |
  12. |5 | EXCHANGE OUT DISTR| |4000 |499 |
  13. |6 | TABLE SCAN |t3 |4000 |499 |
  14. ==============================================
  15. Outputs & filters:
  16. -------------------------------------
  17. 0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
  18. equal_conds([t2.c2 = t3.c2]), other_conds(nil)
  19. 1 - output([t2.c1], [t2.c2]), filter(nil)
  20. 2 - output([t2.c1], [t2.c2]), filter(nil)
  21. 3 - output([t2.c1], [t2.c2]), filter(nil),
  22. access([t2.c1], [t2.c2]), partitions(p[0-3])
  23. 4 - output([t3.c1], [t3.c2]), filter(nil)
  24. 5 - output([t3.c1], [t3.c2]), filter(nil)
  25. 6 - output([t3.c1], [t3.c2]), filter(nil),
  26. access([t3.c1], [t3.c2]), partitions(p[0-3])