简介

分布式执行计划的简单调度模型为,在计划生成的最后阶段,将以 Exchange 节点为界,拆分成多个子计划,每个子计划被封装成为一个 Job,根据计划树的执行顺序,得到每个 Job 之间的依赖关系,生成一个 Job 的执行顺序。在调度时,每次调度一个 Job,执行 Job 的所有 task,在 Job 完成后,生成在顺序上下一个 Job的 task 并执行,直到执行完所有的 Job。

调度Job

Job 树最顶层的 Job 称为 root job, 它由主线程执行,在执行分布式计划时,主线程启动 root job,随后启动调度线程,由调度线程按照 Job 的顺序依次分层调度 Job 的执行。

分层调度

分层调度是比较简单的调度策略,按照 Job 的先后顺序,每次调度一个 Job 执行。每个 Job 执行的中间结果放入到中间结果管理器中,在每个 Job 执行之前生成该 Job 的执行 Task,根据数据的分区和子 Job的输出数据,可以做一些预处理的工作,例如分区的动态裁剪等。 调度逻辑比较简单,因执行资源不足造成死锁的风险相对较低,可以比较简单的使用一些动态裁剪技术。

以如下例所示,第0,1,4行会是 root job,2-3行会是一个 job,5-6行会是另一个 job。在执行时,root job 被启动,然后调度线程会先调度 2-3行的 job 扫描 t2 并将结果写到中间结果管理器。此后 root job 会从各机器上的中间结果管理器将数据独取出来,此后调度线程会在 2-3行的 job 的所有 task 执行完成之后,调度 5-6行的 job 执行。root job的 hash join 算子会根据算子逻辑去中间结果管理区拉取 t3 的扫描结果,最终完成执行。

  1. ==============================================
  2. |ID|OPERATOR |NAME|EST. ROWS|COST |
  3. ----------------------------------------------
  4. |0 |HASH JOIN | |31680 |29835|
  5. |1 | EXCHANGE IN DISTR | |4000 |878 |
  6. |2 | EXCHANGE OUT DISTR| |4000 |499 |
  7. |3 | TABLE SCAN |t2 |4000 |499 |
  8. |4 | EXCHANGE IN DISTR | |4000 |878 |
  9. |5 | EXCHANGE OUT DISTR| |4000 |499 |
  10. |6 | TABLE SCAN |t3 |4000 |499 |
  11. ==============================================

执行job

在 job 被调度之后,面临的问题就是执行 job 的操作。每一个job可以被拆分成可以并行执行的一个或多个 task,在拆分成多个 task 之后,可以根据并行度的设定,去并行的执行这些 task。

拆分task

一个 job 的输入数据可能来自于 scan 算子或子 job 的输出数据,拆分 task 的基本流程就是将所有输入数据参照物理分区或者子 job 的输出的数据分区特征进行分组,然后为每组数据分配一个 task 进行处理。
由于算子执行和 task 执行机制的限制,分组过程有以下限制:

  • 每个 scan 算子在同一个分组中最多只能包含一个物理分区;
  • 如果多个 scan 算子在同一个分组中包含多个物理分区,必须保证所有物理分区的 leader 在同一个 server 上。

输入数据分组即根据物理分区的数据特征分组,将所有具有相同数据特征的分区划分为一组。在子 job 输出数据时,也会根据一定的规则进行数据的重分布,通过这样的重分布获得的不管是物理分区还是动态分区,都视为数据的一种分组。每一组将作为一个task,可以按照并行度的设置并行的分发执行。

裁剪task

  • 静态分区裁剪

根据查询的语意,如果查询中可以明确确定所牵涉的分区数量,例如以分区键作为查询的一个过滤条件,则可以在计划生成阶段得出实际需要访问的分区列表,并且在生成 Job 的 task 的时候,不需要的分区则不会生成对应的 task。

  • 动态分区裁剪

在完成输入数据分组后,并不是所有的 task 都需要被执行,如果对于某些输入分组的输出数据一定为空的话,那么可以在生成 task 时将这样的 task 裁剪掉,以节约执行时间和资源消耗,提高执行效率。子 job 的输出数据划分为多个动态分区后,可以直接裁剪掉所有空分区,降低需要进行分组的分区数量。

如下例所示,两个表进行 inner join 操作,在将第一个表的数据扫描并将输出数据分组后,对于某些没有数据的分区,在生成执行任务时,不需要生成对应分区的 task。

image

在 t2 表的四个执行 task 扫描时,发现最终按照 t1 的分区进行数据重分布,只有 p0 和 p2 有数据,而 t1 表只有 p0 和 p3有数据,则只需要生成一个 join task{t1p0, s10, s20}。

执行task

一个 task 需要由调度线程发送到物理分区所在服务器上,由工作线程执行,并在结束后由工作线程通知调度线程。
task 输出数据由 job 顶端的 exchange out 算子进行动态分区后写入服务器本地的中间结果管理器。当上层 job 的 task 被调度执行时,执行 task 的线程将通过 exchange in 算子从远端服务器的中间结果管理器读取数据。

root job 由主线程执行,可随时与调度线程进行通信,所以 root job 的 receive 算子直接从调度线程获取需要读取的子 job 信息。普通 job 由工作线程执行,在拆分 task 时就确定需要读取的子 job 信息并记录在 task 中,普通 job 的 receive算子只能从 task 信息中获取需要读取的子 job 信息。

控制 task 并行度

并行度控制 task 可以同时被多大的数量并行执行。目前在 OceanBase 中,可以通过设置 session 变量ob_stmt_parallel_degree 或者 SQL 的 hint(parallel(xx)) 的方式来控制。最终的并行度会是 min(# of tasks, specified_parallel_degree)。

需要注意的是,一味的提高执行的并行度并不一定总能提高执行响应时间,如果系统资源的使用已经在一个比较高的水位,提高并行度去试图争抢更多的执行资源,有时候会得到适得其反的效果。