分布式连接是大数据量场景中很重要的一个查询优化执行手段,当两个或多个表的数据量比较大的时候,在执行两表或者多表的连接的时候,需要尽量的将join通过并行执行的方式以提高执行效率,减小查询的响应时间。

    本节将就OceanBase支持的分布式连接的几种典型场景分别做一个介绍。

    Partition-Wise Join

    1. create table t2 (c1 int, c2 int) partition by key(c1) partitions 4;
    2. create table t3 (c1 int, c2 int) partition by key(c1) partitions 4;
    1. explain select * from t2, t3 where t2.c1 = t3.c1 and t2.c2=t3.c2\G
    2. *************************** 1. row ***************************
    3. Query Plan:
    4. ============================================
    5. |ID|OPERATOR |NAME|EST. ROWS|COST|
    6. --------------------------------------------
    7. |0 |EXCHANGE IN DISTR | |63 |8374|
    8. |1 | EXCHANGE OUT DISTR| |63 |8362|
    9. |2 | HASH JOIN | |63 |8362|
    10. |3 | TABLE SCAN |t2 |4000 |499 |
    11. |4 | TABLE SCAN |t3 |4000 |499 |
    12. ============================================
    13. Outputs & filters:
    14. -------------------------------------
    15. 0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
    16. 1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
    17. 2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
    18. equal_conds([t2.c1 = t3.c1], [t2.c2 = t3.c2]), other_conds(nil)
    19. 3 - output([t2.c1], [t2.c2]), filter(nil),
    20. access([t2.c1], [t2.c2]), partitions(p[0-3])
    21. 4 - output([t3.c1], [t3.c2]), filter(nil),
    22. access([t3.c1], [t3.c2]), partitions(p[0-3])
    1. 1

    Partition-wise join是指两表的连接条件包含了两表的分区键,并且两表的分区方式是一样的。以例1的查询来说,t2和t3都是在c1这个列上进行了key分区,分区总数为4个分区,他们的分区模式完全相同。当查询的条件为t2.c1=t3.c1 and t2.c2=t3.c2时,查询条件完全包含了分区键,查询可以以分区为单位在每个分区内进行。如果并行度为4的话,该查询可以同时做4个分区的join并且将最后结果输出。

    以一边分区表的分区模式重新分布另一个表

    同样以t2和t3表为例:

    1. explain select * from t2, t3 where t2.c1 = t3.c2\G
    2. *************************** 1. row ***************************
    3. Query Plan:
    4. =======================================================
    5. |ID|OPERATOR |NAME|EST. ROWS|COST |
    6. -------------------------------------------------------
    7. |0 |EXCHANGE IN DISTR | |31680 |35454|
    8. |1 | EXCHANGE OUT DISTR | |31680 |29456|
    9. |2 | HASH JOIN | |31680 |29456|
    10. |3 | TABLE SCAN |t2 |4000 |499 |
    11. |4 | EXCHANGE IN DISTR | |4000 |878 |
    12. |5 | EXCHANGE OUT DISTR (PKEY)| |4000 |499 |
    13. |6 | TABLE SCAN |t3 |4000 |499 |
    14. =======================================================
    15. Outputs & filters:
    16. -------------------------------------
    17. 0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
    18. 1 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil)
    19. 2 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
    20. equal_conds([t2.c1 = t3.c2]), other_conds(nil)
    21. 3 - output([t2.c1], [t2.c2]), filter(nil),
    22. access([t2.c1], [t2.c2]), partitions(p[0-3])
    23. 4 - output([t3.c1], [t3.c2]), filter(nil)
    24. 5 - (#keys=1, [t3.c2]), output([t3.c1], [t3.c2]), filter(nil)
    25. 6 - output([t3.c1], [t3.c2]), filter(nil),
    26. access([t3.c1], [t3.c2]), partitions(p[0-3])
    1. 2

    例2的查询中,查询join条件覆盖了t2的分区键而没有覆盖t3的分区键,所以执行计划将会按照t2的分区模式,将t3的数据进行以partition key为目标的分组。在将t3的数据依照c2的值和t2的c1列的分区方式打散重分布后,可以以t2的分区为单位进行分区间的并行连接。

    两个表都重新分布

    1. explain select * from t2, t3 where t2.c2 = t3.c2\G
    2. *************************** 1. row ***************************
    3. Query Plan:
    4. ==============================================
    5. |ID|OPERATOR |NAME|EST. ROWS|COST |
    6. ----------------------------------------------
    7. |0 |HASH JOIN | |31680 |29835|
    8. |1 | EXCHANGE IN DISTR | |4000 |878 |
    9. |2 | EXCHANGE OUT DISTR| |4000 |499 |
    10. |3 | TABLE SCAN |t2 |4000 |499 |
    11. |4 | EXCHANGE IN DISTR | |4000 |878 |
    12. |5 | EXCHANGE OUT DISTR| |4000 |499 |
    13. |6 | TABLE SCAN |t3 |4000 |499 |
    14. ==============================================
    15. Outputs & filters:
    16. -------------------------------------
    17. 0 - output([t2.c1], [t2.c2], [t3.c1], [t3.c2]), filter(nil),
    18. equal_conds([t2.c2 = t3.c2]), other_conds(nil)
    19. 1 - output([t2.c1], [t2.c2]), filter(nil)
    20. 2 - output([t2.c1], [t2.c2]), filter(nil)
    21. 3 - output([t2.c1], [t2.c2]), filter(nil),
    22. access([t2.c1], [t2.c2]), partitions(p[0-3])
    23. 4 - output([t3.c1], [t3.c2]), filter(nil)
    24. 5 - output([t3.c1], [t3.c2]), filter(nil)
    25. 6 - output([t3.c1], [t3.c2]), filter(nil),
    26. access([t3.c1], [t3.c2]), partitions(p[0-3])
    1. 3

    对于同样的t2和t3表,如果连接条件既没有包含t2的分区键也没有包含t3的分区键,会生成如例3所示的计划。注意这个计划的join部分是在主线程完成。在后续的OceanBase版本中,这样的计划的join部分也会在工作线程以并行的方式执行。