PipelineOptimizer

注意:该API仅支持【静态图】模式

  • class paddle.fluid.optimizer.PipelineOptimizer(optimizer, cut_list=None, place_list=None, concurrency_list=None, queue_size=30, sync_steps=1, start_cpu_core_id=0)[源代码]

使用流水线模式进行训练。 Program会根据切分列表cut_list进行分割。如果cut_list的长度是k,则整个program(包括反向部分)将被分割为2k-1个section。 所以place_list和concurrency_list的长度也必须是2k-1。

注解

虽然我们在流水线训练模式中采用异步更新的方式来加速,但最终的效果会依赖于每条流水线的训练进程。我们将在未来尝试同步模式。

  • 参数:
    • optimizer (Optimizer) - 基础优化器,如SGD
    • cut_list (list of Variable list) - main_program的cut变量列表
    • place_list (list of Place) - 对应section运行所在的place
    • concurrency_list (list of int) - 指定每个section的并发度列表
    • queue_size (int) - 每个section都会消费其输入队列(in-scope queue)中的scope,并向输出队列(out-scope queue)产出scope。 此参数的作用就是指定队列的大小。 可选,默认值:30
    • sync_steps (int) - 不同显卡之间的同步周期数。可选,默认值:1
    • start_cpu_core_id (int) - 指定所使用的第一个CPU核的id。可选,默认值:0

代码示例

  1. import paddle.fluid as fluid
  2. import paddle.fluid.layers as layers
  3. x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
  4. y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
  5. emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
  6. emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)
  7. concat = layers.concat([emb_x, emb_y], axis=1)
  8. fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
  9. loss = layers.reduce_mean(fc)
  10. optimizer = fluid.optimizer.SGD(learning_rate=0.5)
  11. optimizer = fluid.optimizer.PipelineOptimizer(optimizer,
  12. cut_list=[[emb_x, emb_y], [loss]],
  13. place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
  14. concurrency_list=[1, 1, 4],
  15. queue_size=2,
  16. sync_steps=1,
  17. )
  18. optimizer.minimize(loss)
  19. place = fluid.CPUPlace()
  20. exe = fluid.Executor(place)
  21. exe.run(fluid.default_startup_program())
  22. filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
  23. dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
  24. dataset.set_use_var([x,y])
  25. dataset.set_batch_size(batch_size)
  26. dataset.set_filelist(filelist)
  27. exe.train_from_dataset(
  28. fluid.default_main_program(),
  29. dataset,
  30. thread=2,
  31. debug=False,
  32. fetch_list=[],
  33. fetch_info=[],
  34. print_period=1)