在OceanBase 1.x中,分布式执行计划的调度采用了比较简单的调度模型。在计划生成的最后阶段,将以Exchange节点为界,拆分成多个子计划,每个子计划被封装成为一个Job,根据计划树的执行顺序,得到每个Job之间的依赖关系,生成一个Job的执行顺序。在调度时,每次调度一个Job,执行Job的所有task,在Job完成后,生成在顺序上下一个Job的task并执行,直到执行完所有的Job。

    调度Job

    Job树最顶层的Job称为root job, 它由主线程执行,在执行分布式计划时,主线程启动root job,随后启动调度线程,由调度线程按照Job的顺序依次分层调度Job的执行。

    分层调度是比较简单的调度策略,按照Job的先后顺序,每次调度一个Job执行。每个Job执行的中间结果放入到中间结果管理器中,在每个Job执行之前生成该Job的执行Task,根据数据的分区和子Job的输出数据,可以做一些预处理的工作,例如分区的动态裁剪等。 调度逻辑比较简单,因执行资源不足造成死锁的风险相对较低,可以比较简单的使用一些动态裁剪技术。

    1. ==============================================
    2. |ID|OPERATOR |NAME|EST. ROWS|COST |
    3. ----------------------------------------------
    4. |0 |HASH JOIN | |31680 |29835|
    5. |1 | EXCHANGE IN DISTR | |4000 |878 |
    6. |2 | EXCHANGE OUT DISTR| |4000 |499 |
    7. |3 | TABLE SCAN |t2 |4000 |499 |
    8. |4 | EXCHANGE IN DISTR | |4000 |878 |
    9. |5 | EXCHANGE OUT DISTR| |4000 |499 |
    10. |6 | TABLE SCAN |t3 |4000 |499 |
    11. ==============================================

    以如上计划为例,第0,1,4行会是root job,2-3行会是一个job,5-6行会是另一个job。在执行时,root job被启动,然后调度线程会先调度2-3行的job扫描t2并将结果写到中间结果管理器。此后root job会从各机器上的中间结果管理器将数据独取出来,此后调度线程会在2-3行的job的所有task执行完成之后,调度5-6行的job执行。root job的hash join算子会根据算子逻辑去中间结果管理区拉取t3的扫描结果,最终完成执行。

    执行job

    在job被调度之后,面临的问题就是执行job的操作。每一个Job可以被拆分成可以并行执行的一个或多个task,在拆分成多个task之后,可以根据并行度的设定,去并行的执行这些task。

    拆分task

    一个job的输入数据可能来自于scan算子或子job的输出数据,拆分task的基本流程就是将所有输入数据参照物理分区或者子Job的输出的数据分区特征进行分组,然后为每组数据分配一个task进行处理。由于算子执行和task执行机制的限制,分组过程有以下限制:

    • 每个scan算子在同一个分组中最多只能包含一个物理分区。
    • 如果多个scan算子在同一个分组中包含多个物理分区,必须保证所有物理分区的leader在同一个server上。

    输入数据分组是根据物理分区的数据特征分组,将所有具有相同数据特征的分区划分为一组。在子Job输出数据时,也会根据一定的规则进行数据的重分布,通过这样的重分布获得的不管是物理分区还是动态分区,都视为数据的一种分组。每一组将作为一个task,可以按照并行度的设置并行的分发执行。

    task裁剪

    • 静态分区裁剪

    根据查询的语意,如果查询中可以明确确定所牵涉的分区数量,例如以分区键作为查询的一个过滤条件,则可以在计划生成阶段得出实际需要访问的分区列表,并且在生成Job的task的时候,不需要的分区则不会生成对应的task。

    • 动态分区裁剪

    在完成输入数据分组后,并不是所有的task都需要被执行,如果对于某些输入分组的输出数据一定为空的话,那么可以在生成task时将这样的task裁剪掉,以节约执行时间和资源消耗,提高执行效率。子job的输出数据划分为多个动态分区后,可以直接裁剪掉所有空分区,降低需要进行分组的分区数量。

    如下例所示,两个表进行inner join操作,在将第一个表的数据扫描并将输出数据分组后,对于某些没有数据的分区,在生成执行任务时,不需要生成对应分区的task。分布式执行计划调度 - 图1

    在t2表的四个执行task扫描时,发现最终按照t1的分区进行数据重分布,只有p0和p2有数据,而t1表只有p0和p3有数据,则只需要生成一个join task{t1p0, s10, s20}

    执行task

    一个task需要由调度线程发送到物理分区所在服务器上,由工作线程执行,并在结束后由工作线程通知调度线程。task输出数据由Job顶端的exchange out算子进行动态分区后写入服务器本地的中间结果管理器。当上层Job的task被调度执行时,执行task的线程将通过exchange in算子从远端服务器的中间结果管理器读取数据。

    root job由主线程执行,可随时与调度线程进行通信,所以root job的receive算子直接从调度线程获取需要读取的子job信息。普通job由工作线程执行,在拆分task时就确定需要读取的子job信息并记录在task中,普通job的receive算子只能从task信息中获取需要读取的子job信息。

    并行度

    并行度控制task可以同时被以多大的数量并行执行,目前在OceanBase中,可以通过设置session变量ob_stmt_parallel_degree或者SQL的hint(parallel(xx))的方式来控制。最终的并行度会是min(# of tasks, specified_parallel_degree)。需要注意的是,一味的提高执行的并行度并不一定总能提高执行响应时间,如果系统资源的使用已经在一个比较高的水位,提高并行度去试图争抢更多的执行资源,有时候会得到适得其反的效果。