EXCHANGE 算子用于线程间进行数据交互的算子。

EXCHANGE 算子适用于在分布式场景,一般都是成对出现的,数据源端有一个 OUT 算子,目的端会有一个 IN 算子。

EXCH-IN/OUT

EXCH-IN/OUT 即 EXCHANGE IN/ EXCHANGE OUT 用于将多个分区上的数据汇聚到一起,发送到查询所在的主节点上。

如下例所示,下面的查询中访问了 5 个分区的数据(p0-p4),其中 1 号算子接受 2 号算子产生的输出,并将数据传出;0 号算子接收多个分区上 1 号算子产生的输出,并将结果汇总输出。

  1. obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 5;
  2. Query OK, 0 rows affected (0.12 sec)
  3. obclient>EXPLAIN SELECT * FROM t\G;
  4. *************************** 1. row ***************************
  5. Query Plan:
  6. ==============================================
  7. |ID|OPERATOR |NAME|EST. ROWS|COST |
  8. ----------------------------------------------
  9. |0 |EXCHANGE IN DISTR | |500000 |545109|
  10. |1 | EXCHANGE OUT DISTR| |500000 |320292|
  11. |2 | TABLE SCAN |T |500000 |320292|
  12. ==============================================
  13. Outputs & filters:
  14. -------------------------------------
  15. 0 - output([T.C1], [T.C2]), filter(nil)
  16. 1 - output([T.C1], [T.C2]), filter(nil)
  17. 2 - output([T.C1], [T.C2]), filter(nil),
  18. access([T.C1], [T.C2]), partitions(p[0-4])

上述示例的执行计划展示中的 outputs & filters 详细列出了 EXCH-IN/OUT 算子的输出信息如下:

信息名称

含义

output

该算子输出的表达式。

filter

该算子上的过滤条件。

由于示例中 EXCH-IN/OUT 算子没有设置 filter,所以为 nil。

EXCH-IN/OUT (REMOTE)

EXCH-IN/OUT (REMOTE) 算子用于将远程的数据(单个分区的数据)拉回本地。

如下例所示,在 A 机器上创建了一张非分区表,在 B 机器上执行查询,读取该表的数据。此时,由于待读取的数据在远程,执行计划中分配了 0 号算子和 1 号算子来拉取远程的数据。其中,1 号算子在 A 机器上执行,读取 t 表的数据,并将数据传出;0 号算子在 B 机器上执行,接收 1 号算子产生的输出。

  1. obclient>CREATE TABLE t (c1 INT, c2 INT);
  2. Query OK, 0 rows affected (0.12 sec)
  3. obclient>EXPLAIN SELECT * FROM t\G;
  4. *************************** 1. row ***************************
  5. Query Plan:
  6. ===============================================
  7. |ID|OPERATOR |NAME|EST. ROWS|COST |
  8. -----------------------------------------------
  9. |0 |EXCHANGE IN REMOTE | |100000 |109029|
  10. |1 | EXCHANGE OUT REMOTE| |100000 |64066 |
  11. |2 | TABLE SCAN |T |100000 |64066 |
  12. ===============================================
  13. Outputs & filters:
  14. -------------------------------------
  15. 0 - output([T.C1], [T.C2]), filter(nil)
  16. 1 - output([T.C1], [T.C2]), filter(nil)
  17. 2 - output([T.C1], [T.C2]), filter(nil),
  18. access([T.C1], [T.C2]), partitions(p0)

上述示例的执行计划展示中的 outputs & filters 详细列出了 EXCH-IN/OUT (REMOTE) 算子的输出信息,字段的含义与 EXCH-IN/OUT 算子相同。

EXCH-IN/OUT (PKEY)

EXCH-IN/OUT (PKEY) 算子用于数据重分区。它通常用于二元算子中,将一侧孩子节点的数据按照另外一些孩子节点的分区方式进行重分区。

如下示例中,该查询是对两个分区表的数据进行联接,执行计划将 s 表的数据按照 t 的分区方式进行重分区,4 号算子的输入是 s 表扫描的结果,对于 s 表的每一行,该算子会根据 t 表的数据分区,以及根据查询的联接条件,确定一行数据应该发送到哪个节点进行。

此外,可以看到 3 号算子是一个 EXCHANGE IN MERGE SORT DISTR,它是一个特殊的 EXCHANGE IN 算子,它用于在汇总多个分区的数据时,会进行一定的归并排序,在这个执行计划中,3 号算子接收到的每个分区的数据都是按照 c1 有序排列的,它会对每个接收到的数据进行归并排序,从而保证结果输出结果也是按照 c1 有序排列的。

  1. obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 5;
  2. Query OK, 0 rows affected (0.12 sec)
  3. obclient>CREATE TABLE s (c1 INT PRIMARY KEY, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
  4. Query OK, 0 rows affected (0.12 sec)
  5. obclient>EXPLAIN SELECT * FROM s, t WHERE s.c1 = t.c1\G;
  6. *************************** 1. row ***************************
  7. Query Plan:
  8. ===============================================================
  9. |ID|OPERATOR |NAME|EST. ROWS |COST |
  10. ---------------------------------------------------------------
  11. |0 |EXCHANGE IN DISTR | |1960200000|3090308367|
  12. |1 | EXCHANGE OUT DISTR | |1960200000|1327558071|
  13. |2 | MERGE JOIN | |1960200000|1327558071|
  14. |3 | EXCHANGE IN MERGE SORT DISTR| |400000 |436080 |
  15. |4 | EXCHANGE OUT DISTR (PKEY) | |400000 |256226 |
  16. |5 | TABLE SCAN |S |400000 |256226 |
  17. |6 | TABLE SCAN |T |500000 |320292 |
  18. ===============================================================
  19. Outputs & filters:
  20. -------------------------------------
  21. 0 - output([S.C1], [S.C2], [T.C1], [T.C2]), filter(nil)
  22. 1 - output([S.C1], [S.C2], [T.C1], [T.C2]), filter(nil)
  23. 2 - output([S.C1], [S.C2], [T.C1], [T.C2]), filter(nil),
  24. equal_conds([S.C1 = T.C1]), other_conds(nil)
  25. 3 - output([S.C1], [S.C2]), filter(nil), sort_keys([S.C1, ASC])
  26. 4 - (#keys=1, [S.C1]), output([S.C1], [S.C2]), filter(nil)
  27. 5 - output([S.C1], [S.C2]), filter(nil),
  28. access([S.C1], [S.C2]), partitions(p[0-3])
  29. 6 - output([T.C1], [T.C2]), filter(nil),
  30. access([T.C1], [T.C2]), partitions(p[0-4])

上述示例的执行计划展示中的 outputs & filters 详细列出了 EXCH-IN/OUT (PKEY) 算子的输出信息如下:

信息名称

含义

output

该算子输出的表达式。

filter

该算子上的过滤条件。

由于示例中 EXCH-IN/OUT(PKEY)算子没有设置 filter,所以为 nil。

pkey

按照哪一列进行重分区。

例如,#keys=1, [s.c1] 表示按照 c1 这一列重分区

EXCH-IN/OUT (HASH)

EXCH-IN/OUT (HASH) 算子用于对数据使用一组 HASH 函数进行重分区。

如下例所示的执行计划中,3-5 号以及 7-8 号是两组使用 HASH 重分区的 EXCHANGE 算子。这两组算子的作用是把 t 表和 s 表的数据按照一组新的 HASH 函数打散成多份,在这个例子中 HASH 的列为 t.c2 和 s.c2,这保证了 c2 取值相同的行会被分发到同一份中。基于重分区之后的数据,2 号算子 HASH JOIN 会对每一份数据按照 t.c2= s.c2 进行联接。

此外,由于查询中执行了并行度为 2,计划中展示了 dop = 2 (dop 是 Degree of Parallelism 的缩写)。

  1. obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
  2. Query OK, 0 rows affected (0.12 sec)
  3. obclient>CREATE TABLE s (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
  4. Query OK, 0 rows affected (0.12 sec)
  5. obclient>EXPLAIN SELECT /*+PQ_DISTRIBUTE(@"SEL$1" ("TEST.S"@"SEL$1" ) HASH HASH),
  6. PARALLEL(2)*/ * FROM t, s WHERE t.c2 = s.c2\G;
  7. *************************** 1. row ***************************
  8. Query Plan:
  9. =================================================================
  10. |ID|OPERATOR |NAME |EST. ROWS |COST |
  11. -----------------------------------------------------------------
  12. |0 |PX COORDINATOR | |1568160000|2473629500|
  13. |1 | EXCHANGE OUT DISTR |:EX10002|1568160000|1063429263|
  14. |2 | HASH JOIN | |1568160000|1063429263|
  15. |3 | EXCHANGE IN DISTR | |400000 |436080 |
  16. |4 | EXCHANGE OUT DISTR (HASH)|:EX10000|400000 |256226 |
  17. |5 | PX PARTITION ITERATOR | |400000 |256226 |
  18. |6 | TABLE SCAN |T |400000 |256226 |
  19. |7 | EXCHANGE IN DISTR | |400000 |436080 |
  20. |8 | EXCHANGE OUT DISTR (HASH)|:EX10001|400000 |256226 |
  21. |9 | PX PARTITION ITERATOR | |400000 |256226 |
  22. |10| TABLE SCAN |S |400000 |256226 |
  23. =================================================================
  24. Outputs & filters:
  25. -------------------------------------
  26. 0 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil)
  27. 1 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil), dop=2
  28. 2 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil),
  29. equal_conds([T.C2 = S.C2]), other_conds(nil)
  30. 3 - output([T.C1], [T.C2]), filter(nil)
  31. 4 - (#keys=1, [T.C2]), output([T.C1], [T.C2]), filter(nil), dop=2
  32. 5 - output([T.C1], [T.C2]), filter(nil)
  33. 6 - output([T.C1], [T.C2]), filter(nil),
  34. access([T.C1], [T.C2]), partitions(p[0-3])
  35. 7 - output([S.C1], [S.C2]), filter(nil)
  36. 8 - (#keys=1, [S.C2]), output([S.C1], [S.C2]), filter(nil), dop=2
  37. 9 - output([S.C1], [S.C2]), filter(nil)
  38. 10 - output([S.C1], [S.C2]), filter(nil),
  39. access([S.C1], [S.C2]), partitions(p[0-3])

其中,PX PARTITION ITERATO 算子用于按照分区粒度迭代数据,详细信息请参见 GI

上述示例的执行计划展示中的 outputs & filters 详细列出了 EXCH-IN/OUT (HASH) 算子的输出信息如下:

信息名称

含义

output

该算子输出的表达式。

filter

该算子上的过滤条件。

由于示例中 EXCH-IN/OUT (HASH) 算子没有设置 filter,所以为 nil。

pkey

按照哪一列进行 HASH 重分区。

例如,#keys=1, [s.c2] 表示按照 c2 这一列进行 HASH 重分区。

EXCH-IN/OUT(BROADCAST)

EXCH-IN/OUT(BROADCAST) 算子用于对输入数据使用 BROADCAST 的方法进行重分区,它会将数据广播到其他线程上。

如下示例的执行计划中,3-4 号是一组使用 BROADCAST 重分区方式的 EXCHANGE 算子。它会将 t 表的数据广播到每个线程上,s 表每个分区的数据都会尝试和被广播的 t 表数据进行联接。

  1. obclient>CREATE TABLE t (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
  2. Query OK, 0 rows affected (0.12 sec)
  3. obclient>CREATE TABLE s (c1 INT, c2 INT) PARTITION BY HASH(c1) PARTITIONS 4;
  4. Query OK, 0 rows affected (0.12 sec)
  5. obclient>INSERT INTO s VALUES (1, 1), (2, 2), (3, 3), (4, 4);
  6. Query OK, 1 rows affected (0.12 sec)
  7. obclient>EXPALIN SELECT /*+PARALLEL(2) */ * FROM t, s WHERE t.c2 = s.c2\G;
  8. *************************** 1. row ***************************
  9. Query Plan:
  10. ======================================================================
  11. |ID|OPERATOR |NAME |EST. ROWS |COST |
  12. ----------------------------------------------------------------------
  13. |0 |PX COORDINATOR | |1568160000|2473449646|
  14. |1 | EXCHANGE OUT DISTR |:EX10001|1568160000|1063249409|
  15. |2 | HASH JOIN | |1568160000|1063249409|
  16. |3 | EXCHANGE IN DISTR | |400000 |436080 |
  17. |4 | EXCHANGE OUT DISTR (BROADCAST)|:EX10000|400000 |256226 |
  18. |5 | PX PARTITION ITERATOR | |400000 |256226 |
  19. |6 | TABLE SCAN |T |400000 |256226 |
  20. |7 | PX PARTITION ITERATOR | |400000 |256226 |
  21. |8 | TABLE SCAN |S |400000 |256226 |
  22. ======================================================================
  23. Outputs & filters:
  24. -------------------------------------
  25. 0 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil)
  26. 1 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil), dop=2
  27. 2 - output([T.C1], [T.C2], [S.C1], [S.C2]), filter(nil),
  28. equal_conds([T.C2 = S.C2]), other_conds(nil)
  29. 3 - output([T.C1], [T.C2]), filter(nil)
  30. 4 - output([T.C1], [T.C2]), filter(nil), dop=2
  31. 5 - output([T.C1], [T.C2]), filter(nil)
  32. 6 - output([T.C1], [T.C2]), filter(nil),
  33. access([T.C1], [T.C2]), partitions(p[0-3])
  34. 7 - output([S.C1], [S.C2]), filter(nil)
  35. 8 - output([S.C1], [S.C2]), filter(nil),
  36. access([S.C1], [S.C2]), partitions(p[0-3])

上述示例的执行计划展示中的 outputs & filters 详细列出了 EXCH-IN/OUT (BROADCAST) 算子的信息,字段的含义与 EXCH-IN/OUT 算子相同。