DistributeTranspiler

class paddle.fluid.DistributeTranspiler(config=None)[源代码]

该类可以把fluid program转变为分布式数据并行计算的program, 有PServer和NCCL2两种模式。 在Pserver(全称:parameter server)模式下, 通过 transpile 将用于单机训练的 program 转译为可用于parameter server的分布式架构(即PServer,参数服务器)来进行训练的program。 在NCCL2模式下, 通过 transpile 将用于单机训练的 program 转译为可用于NCCL2的分布式架构来进行训练的program。在NCCL2模式下,transpiler会在 startup_program 中附加一个 NCCL_ID 广播 算子(broadcasting operators)来实现在该集群中所有工作结点共享``NCCL_ID`` 。 调用 transpile_nccl2 后, 你 必须trainer_id , num_trainers 参数提供给 Executor 来启动NCCL2分布式模式。

参数

  • config (DistributeTranspilerConfig) DistributeTranspiler属性配置实例,定义了program转变所需要的属性, 请参考:DistributeTranspilerConfig 相关文档。

返回

初始化后的DistributeTranspiler实例

返回类型

实例(DistributeTranspiler)

代码示例

  1. x = fluid.layers.data(name='x', shape=[13], dtype='float32')
  2. y = fluid.layers.data(name='y', shape=[1], dtype='float32')
  3. y_predict = fluid.layers.fc(input=x, size=1, act=None)
  4. cost = fluid.layers.square_error_cost(input=y_predict, label=y)
  5. avg_loss = fluid.layers.mean(cost)
  6. sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001)
  7. sgd_optimizer.minimize(avg_loss)
  8. # pserver 模式下
  9. pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
  10. trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
  11. current_endpoint = "192.168.0.1:6174"
  12. trainer_id = 0
  13. trainers = 4
  14. role = "PSERVER"
  15. t = fluid.DistributeTranspiler()
  16. t.transpile(
  17. trainer_id, pservers=pserver_endpoints, trainers=trainers)
  18. if role == "PSERVER":
  19. pserver_program = t.get_pserver_program(current_endpoint)
  20. pserver_startup_program = t.get_startup_program(current_endpoint,
  21. pserver_program)
  22. elif role == "TRAINER":
  23. trainer_program = t.get_trainer_program()
  24. # nccl2 模式下
  25. trainer_num = 2
  26. trainer_id = 0
  27. config = fluid.DistributeTranspilerConfig()
  28. config.mode = "nccl2"
  29. trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
  30. t = fluid.DistributeTranspiler(config=config)
  31. t.transpile(trainer_id=trainer_id, trainers=trainer_endpoints, current_endpoint="192.168.0.1:6174")
  32. exe = fluid.ParallelExecutor(
  33. use_cuda=True,
  34. loss_name=avg_loss.name,
  35. num_trainers=trainer_num,
  36. trainer_id=trainer_id
  37. )

方法

transpile(trainer_id, program=None, pservers=’127.0.0.1:6174’, trainers=1, sync_mode=True, startup_program=None, current_endpoint=’127.0.0.1:6174’)

通过此方法,可根据用户配置将单机的program转换为当前节点可用的数据并行的分布式program。

参数

  • trainer_id (int) – 当前Trainer worker的id, 如果有n个Trainer worker, id 取值范围为0 ~ n-1
  • program (Program|None) – 待transpile(转译)的program, 缺省为 fluid.default_main_program()
  • startup_program (Program|None) - 要转译的 startup_program ,默认为 fluid.default_startup_program()
  • pservers (str) – 内容为Pserver列表的字符串,格式为:按逗号区分不同的Pserver,每个Pserver的格式为 ip地址:端口号
  • trainers (int|str) – 在Pserver模式下,该参数指Trainer机的个数;在nccl2模式下,它是一个内容为Trainer终端列表的字符串
  • sync_mode (bool) – 是否做同步训练(synchronous training), 默认为True
  • startup_program (Program|None) – 待transpile(转译)的startup_program,默认为 fluid.default_main_program()
  • current_endpoint (str) – 当需要把program转译(transpile)至NCCL2模式下时,需要将当前endpoint(终端)传入该参数。PServer模型下,当用户需要使用增量训练时,必须要指定该参数。

返回 None

代码示例

  1. transpiler = fluid.DistributeTranspiler()
  2. t.transpile(
  3. trainer_id=0,
  4. pservers="127.0.0.1:7000,127.0.0.1:7001",
  5. trainers=2,
  6. sync_mode=False,
  7. current_endpoint="127.0.0.1:7000")

get_trainer_program(wait_port=True)

该方法可以得到Trainer侧的program。

返回

Trainer侧的program

返回类型

Program

代码示例

  1. import paddle.fluid as fluid
  2. # 这是一个示例,请根据你的情况更改endpoint
  3. pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
  4. trainer_id = 0
  5. trainers = 4
  6. t = fluid.DistributeTranspiler()
  7. t.transpile(trainer_id, trainers=trainers, pservers=pserver_endpoints)
  8. trainer_program = t.get_trainer_program()

get_pserver_program(endpoint)

该方法可以得到Pserver(参数服务器)侧的程序

参数

  • endpoint (str) – 当前Pserver终端

返回

当前Pserver需要执行的program

返回类型

Program

代码示例

  1. import paddle.fluid as fluid
  2. # 这是一个示例,请根据你的情况更改endpoint
  3. pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
  4. current_endpoint = "192.168.0.1:6174"
  5. trainer_id = 0
  6. trainers = 4
  7. t = fluid.DistributeTranspiler()
  8. t.transpile(
  9. trainer_id, pservers=pserver_endpoints, trainers=trainers)
  10. pserver_program = t.get_pserver_program(current_endpoint)

get_pserver_programs(endpoint)

该方法可以得到Pserver侧用于分布式训练的 main_programstartup_program

参数

  • endpoint (str) – 当前Pserver终端

返回

(main_program, startup_program), “Program”类型的元组

返回类型

tuple

代码示例

  1. import paddle.fluid as fluid
  2. # 这是一个示例,请根据你的情况更改endpoint
  3. pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
  4. current_endpoint = "192.168.0.1:6174"
  5. trainer_id = 0
  6. trainers = 4
  7. t = fluid.DistributeTranspiler()
  8. t.transpile(
  9. trainer_id, pservers=pserver_endpoints, trainers=trainers)
  10. pserver_program, pserver_startup_program = t.get_pserver_programs(current_endpoint)

get_startup_program(endpoint, pserver_program=None, startup_program=None)

该函数已停止使用 获取当前Pserver的startup_program,如果有多个被分散到不同blocks的变量,则修改operator的输入变量。

参数

  • endpoint (str) – 当前Pserver终端
  • pserver_program (Program) – 已停止使用。 先调用get_pserver_program
  • startup_program (Program) – 已停止使用。应在初始化时传入startup_program

返回

Pserver侧的startup_program

返回类型

Program

代码示例

  1. pserver_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
  2. trainer_endpoints = "192.168.0.1:6174,192.168.0.2:6174"
  3. current_endpoint = "192.168.0.1:6174"
  4. trainer_id = 0
  5. trainers = 4
  6. t = fluid.DistributeTranspiler()
  7. t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
  8. pserver_program = t.get_pserver_program(current_endpoint)
  9. pserver_startup_program = t.get_startup_program(current_endpoint,
  10. pserver_program)