数据并行执行引擎
ParallelExecutor 是以数据并行的方式在多个节点上分别执行 Program 的执行器。用户可以通过Python脚本驱动 ParallelExecutor 执行, ParallelExecutor 的执行过程:
- 首先根据
Program、GPU卡的数目(或者CPU的核数)以及 BuildStrategy 构建SSA Graph和一个线程池; 执行过程中,根据Op的输入是否Ready决定是否执行该Op,这样可以使没有相互依赖的多个Op可在线程池中并行执行;
ParallelExecutor在构造时需要指定当前Program的设备类型,GPU或者CPU:使用
GPU执行:ParallelExecutor会自动检测当前机器可以使用GPU的个数,并在每个GPU上分别执行Program,用户也可以通过设置CUDA_VISIBLE_DEVICES环境变量来指定执行器可使用的GPU;使用
CPU多线程执行:ParallelExecutor会自动检测当前机器可利用的CPU核数,并将CPU核数作为执行器中线程的个数,每个线程分别执行Program,用户也可以通过设置CPU_NUM环境变量来指定当前训练使用的线程个数。ParallelExecutor支持模型训练和模型预测:模型训练:
ParallelExecutor在执行过程中对多个节点上的参数梯度进行聚合,然后进行参数的更新;模型预测:
ParallelExecutor在执行过程中各个节点独立运行当前的Program;ParallelExecutor在模型训练时支持两种模式的梯度聚合,AllReduce和Reduce:AllReduce模式下,ParallelExecutor调用AllReduce操作使多个节点上参数梯度完全相等,然后各个节点独立进行参数的更新;Reduce模式下,ParallelExecutor会预先将所有参数的更新分派到不同的节点上,在执行过程中ParallelExecutor调用Reduce操作将参数梯度在预先指定的节点上进行聚合,并进行参数更新,最后调用Broadcast操作将更新后的参数发送到其他节点。这两种模式通过build_strategy来指定,使用方法,请参考 BuildStrategy 。
注意 :如果在Reduce模式下使用 CPU 多线程执行 Program , Program 的参数在多个线程间是共享的,在某些模型上,Reduce模式可以大幅节省内存。
鉴于模型的执行速率和模型结构及执行器的执行策略有关,ParallelExecutor 允许你修改执行器的相关参数,例如线程池的规模( num_threads )、为清除临时变量 num_iteration_per_drop_scope 需要进行的循环次数。更多信息请参照 ExecutionStrategy 。
- # 注释:
- # - 如果你想在ParallelExecutor中指定用于运行的GPU卡,需要在环境中定义
- # CUDA_VISIBLE_DEVICES
- # - 如果你想在ParallelExecutor中使用多CPU来运行程序,需要在环境中定义
- # CPU_NUM
- # 首先创建Executor。
- place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
- exe = fluid.Executor(place)
- # 运行启动程序仅一次。
- exe.run(fluid.default_startup_program())
- # 定义train_exe和test_exe
- exec_strategy = fluid.ExecutionStrategy()
- exec_strategy.num_threads = dev_count * 4 # the size of thread pool.
- build_strategy = fluid.BuildStrategy()
- build_strategy.memory_optimize = True if memory_opt else False
- train_exe = fluid.ParallelExecutor(use_cuda=use_cuda,
- main_program=train_program,
- build_strategy=build_strategy,
- exec_strategy=exec_strategy,
- loss_name=loss.name)
- # 注释:对于test_exe,loss_name是不必要的。
- test_exe = fluid.ParallelExecutor(use_cuda=True,
- main_program=test_program,
- build_strategy=build_strategy,
- exec_strategy=exec_strategy,
- share_vars_from=train_exe)
- train_loss, = train_exe.run(fetch_list=[loss.name], feed=feed_dict)
- test_loss, = test_exe.run(fetch_list=[loss.name], feed=feed_dict)
- 相关API :