分布式训练快速开始

FleetX 是飞桨分布式训练扩展包,为了可以让用户更快速了解和使用飞桨分布式训练特性,提供了大量分布式训练例子,可以查阅 https://github.com/PaddlePaddle/FleetX/tree/develop/examples,以下章节的例子都可以在这找到,用户也可以直接将仓库下载到本地直接。

一、Collective 训练快速开始

本节将采用CV领域非常经典的模型ResNet50为例,介绍如何使用Fleet API(paddle.distributed.fleet)完成Collective训练任务。 数据方面我们采用Paddle内置的flowers数据集,优化器使用Momentum方法。循环迭代多个epoch,每轮打印当前网络具体的损失值和acc值。 具体代码保存在FleetX/examples/resnet下面, 其中包含动态图和静态图两种执行方式。resnet_dygraph.py为动态图模型相关代码,train_fleet_dygraph.py为动态图训练脚本。 resnet_static.py为静态图模型相关代码,而train_fleet_static.py为静态图训练脚本。

1.1 版本要求

在编写分布式训练程序之前,用户需要确保已经安装paddlepaddle-2.0.0-rc-cpu或paddlepaddle-2.0.0-rc-gpu及以上版本的飞桨开源框架。

1.2 操作方法

与单机单卡的普通模型训练相比,无论静态图还是动态图,Collective训练的代码都只需要补充三个部分代码:

  1. 导入分布式训练需要的依赖包。

  2. 初始化Fleet环境。

  3. 设置分布式训练需要的优化器。

下面将逐一进行讲解。

1.2.1 导入依赖

导入必要的依赖,例如分布式训练专用的Fleet API(paddle.distributed.fleet)。

  1. from paddle.distributed import fleet

1.2.2 初始化fleet环境

包括定义缺省的分布式策略,然后通过将参数is_collective设置为True,使训练架构设定为Collective架构。

  1. strategy = fleet.DistributedStrategy()
  2. fleet.init(is_collective=True, strategy=strategy)

1.2.3 设置分布式训练使用的优化器

使用distributed_optimizer设置分布式训练优化器。

  1. optimizer = fleet.distributed_optimizer(optimizer)

1.3 动态图完整代码

train_fleet_dygraph.py的完整训练代码如下所示。

  1. # -*- coding: UTF-8 -*-
  2. import numpy as np
  3. import argparse
  4. import ast
  5. import paddle
  6. # 导入必要分布式训练的依赖包
  7. from paddle.distributed import fleet
  8. # 导入模型文件
  9. from resnet_dygraph import ResNet
  10. base_lr = 0.1 # 学习率
  11. momentum_rate = 0.9 # 冲量
  12. l2_decay = 1e-4 # 权重衰减
  13. epoch = 10 #训练迭代次数
  14. batch_size = 32 #训练批次大小
  15. class_dim = 102
  16. # 设置数据读取器
  17. def reader_decorator(reader):
  18. def __reader__():
  19. for item in reader():
  20. img = np.array(item[0]).astype('float32').reshape(3, 224, 224)
  21. label = np.array(item[1]).astype('int64').reshape(1)
  22. yield img, label
  23. return __reader__
  24. # 设置优化器
  25. def optimizer_setting(parameter_list=None):
  26. optimizer = paddle.optimizer.Momentum(
  27. learning_rate=base_lr,
  28. momentum=momentum_rate,
  29. weight_decay=paddle.regularizer.L2Decay(l2_decay),
  30. parameters=parameter_list)
  31. return optimizer
  32. # 设置训练函数
  33. def train_resnet():
  34. # 初始化Fleet环境
  35. fleet.init(is_collective=True)
  36. resnet = ResNet(class_dim=class_dim, layers=50)
  37. optimizer = optimizer_setting(parameter_list=resnet.parameters())
  38. optimizer = fleet.distributed_optimizer(optimizer)
  39. # 通过Fleet API获取分布式model,用于支持分布式训练
  40. resnet = fleet.distributed_model(resnet)
  41. train_reader = paddle.batch(
  42. reader_decorator(paddle.dataset.flowers.train(use_xmap=True)),
  43. batch_size=batch_size,
  44. drop_last=True)
  45. train_loader = paddle.io.DataLoader.from_generator(
  46. capacity=32,
  47. use_double_buffer=True,
  48. iterable=True,
  49. return_list=True,
  50. use_multiprocess=True)
  51. train_loader.set_sample_list_generator(train_reader)
  52. for eop in range(epoch):
  53. resnet.train()
  54. for batch_id, data in enumerate(train_loader()):
  55. img, label = data
  56. label.stop_gradient = True
  57. out = resnet(img)
  58. loss = paddle.nn.functional.cross_entropy(input=out, label=label)
  59. avg_loss = paddle.mean(x=loss)
  60. acc_top1 = paddle.metric.accuracy(input=out, label=label, k=1)
  61. acc_top5 = paddle.metric.accuracy(input=out, label=label, k=5)
  62. dy_out = avg_loss.numpy()
  63. avg_loss.backward()
  64. optimizer.minimize(avg_loss)
  65. resnet.clear_gradients()
  66. if batch_id % 5 == 0:
  67. print("[Epoch %d, batch %d] loss: %.5f, acc1: %.5f, acc5: %.5f" % (eop, batch_id, dy_out, acc_top1, acc_top5))
  68. # 启动训练
  69. if __name__ == '__main__':
  70. train_resnet()

1.4 静态图完整代码

train_fleet_static.py的完整训练代码如下所示。

  1. # -*- coding: UTF-8 -*-
  2. import numpy as np
  3. import argparse
  4. import ast
  5. import paddle
  6. # 导入必要分布式训练的依赖包
  7. import paddle.distributed.fleet as fleet
  8. # 导入模型文件
  9. import resnet_static as resnet
  10. import os
  11. base_lr = 0.1 # 学习率
  12. momentum_rate = 0.9 # 冲量
  13. l2_decay = 1e-4 # 权重衰减
  14. epoch = 10 #训练迭代次数
  15. batch_size = 32 #训练批次大小
  16. class_dim = 10
  17. # 设置优化器
  18. def optimizer_setting(parameter_list=None):
  19. optimizer = paddle.optimizer.Momentum(
  20. learning_rate=base_lr,
  21. momentum=momentum_rate,
  22. weight_decay=paddle.regularizer.L2Decay(l2_decay),
  23. parameters=parameter_list)
  24. return optimizer
  25. # 设置数据读取器
  26. def get_train_loader(feed_list, place):
  27. def reader_decorator(reader):
  28. def __reader__():
  29. for item in reader():
  30. img = np.array(item[0]).astype('float32').reshape(3, 224, 224)
  31. label = np.array(item[1]).astype('int64').reshape(1)
  32. yield img, label
  33. return __reader__
  34. train_reader = paddle.batch(
  35. reader_decorator(paddle.dataset.flowers.train(use_xmap=True)),
  36. batch_size=batch_size,
  37. drop_last=True)
  38. train_loader = paddle.io.DataLoader.from_generator(
  39. capacity=32,
  40. use_double_buffer=True,
  41. feed_list=feed_list,
  42. iterable=True)
  43. train_loader.set_sample_list_generator(train_reader, place)
  44. return train_loader
  45. # 设置训练函数
  46. def train_resnet():
  47. print("Start collective training example:")
  48. paddle.enable_static() # 使能静态图功能
  49. paddle.vision.set_image_backend('cv2')
  50. image = paddle.static.data(name="x", shape=[None, 3, 224, 224], dtype='float32')
  51. label= paddle.static.data(name="y", shape=[None, 1], dtype='int64')
  52. # 调用ResNet50模型
  53. model = resnet.ResNet(layers=50)
  54. out = model.net(input=image, class_dim=class_dim)
  55. avg_cost = paddle.nn.functional.cross_entropy(input=out, label=label)
  56. acc_top1 = paddle.metric.accuracy(input=out, label=label, k=1)
  57. acc_top5 = paddle.metric.accuracy(input=out, label=label, k=5)
  58. # 设置训练资源,本例使用GPU资源
  59. place = paddle.CUDAPlace(int(os.environ.get('FLAGS_selected_gpus', 0)))
  60. print("Run on {}.".format(place))
  61. train_loader = get_train_loader([image, label], place)
  62. #初始化Fleet环境
  63. strategy = fleet.DistributedStrategy()
  64. fleet.init(is_collective=True, strategy=strategy)
  65. optimizer = optimizer_setting()
  66. # 通过Fleet API获取分布式优化器,将参数传入飞桨的基础优化器
  67. optimizer = fleet.distributed_optimizer(optimizer)
  68. optimizer.minimize(avg_cost)
  69. exe = paddle.static.Executor(place)
  70. print("Execute startup program.")
  71. exe.run(paddle.static.default_startup_program())
  72. epoch = 10
  73. step = 0
  74. for eop in range(epoch):
  75. for batch_id, data in enumerate(train_loader()):
  76. loss, acc1, acc5 = exe.run(paddle.static.default_main_program(), feed=data, fetch_list=[avg_cost.name, acc_top1.name, acc_top5.name])
  77. if batch_id % 5 == 0:
  78. print("[Epoch %d, batch %d] loss: %.5f, acc1: %.5f, acc5: %.5f" % (eop, batch_id, loss, acc1, acc5))
  79. # 启动训练
  80. if __name__ == '__main__':
  81. train_resnet()

1.5 运行示例

假设要运行2卡的任务,那么只需在命令行中执行:

动态图:

  1. python3 -m paddle.distributed.launch --gpus=0,1 train_fleet_dygraph.py

您将看到显示如下日志信息:

  1. ----------- Configuration Arguments -----------
  2. gpus: 0,1
  3. heter_worker_num: None
  4. heter_workers:
  5. http_port: None
  6. ips: 127.0.0.1
  7. log_dir: log
  8. nproc_per_node: None
  9. server_num: None
  10. servers:
  11. training_script: train_fleet_dygraph.py
  12. training_script_args: []
  13. worker_num: None
  14. workers:
  15. ------------------------------------------------
  16. WARNING 2021-05-06 11:32:50,804 launch.py:316] Not found distinct arguments and compiled with cuda. Default use collective mode
  17. launch train in GPU mode
  18. INFO 2021-05-06 11:32:50,806 launch_utils.py:472] Local start 2 processes. First process distributed environment info (Only For Debug):
  19. +=======================================================================================+
  20. | Distributed Envs Value |
  21. +---------------------------------------------------------------------------------------+
  22. | PADDLE_TRAINER_ENDPOINTS 127.0.0.1:20923,127.0.0.1:10037 |
  23. | FLAGS_selected_gpus 0 |
  24. | PADDLE_TRAINER_ID 0 |
  25. | PADDLE_TRAINERS_NUM 2 |
  26. | PADDLE_CURRENT_ENDPOINT 127.0.0.1:20923 |
  27. +=======================================================================================+
  28. INFO 2021-05-06 11:32:50,806 launch_utils.py:475] details abouts PADDLE_TRAINER_ENDPOINTS can be found in log/endpoints.log, and detail running logs maybe found in log/workerlog.0
  29. grep: warning: GREP_OPTIONS is deprecated; please use an alias or script
  30. I0506 11:32:51.828132 6427 nccl_context.cc:189] init nccl context nranks: 2 local rank: 0 gpu id: 0 ring id: 0
  31. W0506 11:32:52.365190 6427 device_context.cc:362] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 11.0, Runtime API Version: 11.0
  32. W0506 11:32:52.368203 6427 device_context.cc:372] device: 0, cuDNN Version: 8.0.
  33. [Epoch 0, batch 0] loss: 4.98047, acc1: 0.00000, acc5: 0.00000
  34. [Epoch 0, batch 5] loss: 39.06348, acc1: 0.03125, acc5: 0.09375
  35. ...

静态图:

  1. python3 -m paddle.distributed.launch --gpus=0,1 train_fleet_static.py

您将看到显示如下日志信息:

  1. ----------- Configuration Arguments -----------
  2. gpus: 0,1
  3. heter_worker_num: None
  4. heter_workers:
  5. http_port: None
  6. ips: 127.0.0.1
  7. log_dir: log
  8. nproc_per_node: None
  9. server_num: None
  10. servers:
  11. training_script: train_fleet_static.py
  12. training_script_args: []
  13. worker_num: None
  14. workers:
  15. ------------------------------------------------
  16. WARNING 2021-05-06 11:36:30,019 launch.py:316] Not found distinct arguments and compiled with cuda. Default use collective mode
  17. launch train in GPU mode
  18. INFO 2021-05-06 11:36:30,021 launch_utils.py:472] Local start 2 processes. First process distributed environment info (Only For Debug):
  19. +=======================================================================================+
  20. | Distributed Envs Value |
  21. +---------------------------------------------------------------------------------------+
  22. | PADDLE_TRAINER_ID 0 |
  23. | PADDLE_CURRENT_ENDPOINT 127.0.0.1:10039 |
  24. | PADDLE_TRAINER_ENDPOINTS 127.0.0.1:10039,127.0.0.1:31719 |
  25. | PADDLE_TRAINERS_NUM 2 |
  26. | FLAGS_selected_gpus 0 |
  27. +=======================================================================================+
  28. INFO 2021-05-06 11:36:30,021 launch_utils.py:475] details abouts PADDLE_TRAINER_ENDPOINTS can be found in log/endpoints.log, and detail running logs maybe found in log/workerlog.0
  29. grep: warning: GREP_OPTIONS is deprecated; please use an alias or script
  30. Start collective training example:
  31. Run on CUDAPlace(0).
  32. server not ready, wait 3 sec to retry...
  33. not ready endpoints:['127.0.0.1:31719']
  34. Execute startup program.
  35. W0506 11:36:35.667778 6697 device_context.cc:362] Please NOTE: device: 0, GPU Compute Capability: 7.0, Driver API Version: 11.0, Runtime API Version: 11.0
  36. W0506 11:36:35.671609 6697 device_context.cc:372] device: 0, cuDNN Version: 8.0.
  37. Start training:
  38. W0506 11:36:39.900507 6697 fuse_all_reduce_op_pass.cc:79] Find all_reduce operators: 161. To make the speed faster, some all_reduce ops are fused during training, after fusion, the number of all_reduce ops is 5.
  39. [Epoch 0, batch 0] loss: 4.67622, acc1: 0.00000, acc5: 0.09375
  40. [Epoch 0, batch 5] loss: 30.24010, acc1: 0.00000, acc5: 0.06250
  41. ...

从单机多卡到多机多卡训练,在代码上不需要做任何改动,只需再额外指定ips参数即可。其内容为多机的ip列表,命令如下所示:

  1. # 动态图
  2. python3 -m paddle.distributed.launch --ips="xx.xx.xx.xx,yy.yy.yy.yy" --gpus 0,1,2,3,4,5,6,7 train_fleet_dygraph.py
  3. # 静态图
  4. python3 -m paddle.distributed.launch --ips="xx.xx.xx.xx,yy.yy.yy.yy" --gpus 0,1,2,3,4,5,6,7 train_fleet_static.py

二、ParameterServer训练快速开始

本节将采用推荐领域非常经典的模型wide_and_deep为例,介绍如何使用Fleet API(paddle.distributed.fleet)完成参数服务器训练任务,本次快速开始的完整示例代码位于 https://github.com/PaddlePaddle/FleetX/tree/develop/examples/wide_and_deep

2.1 版本要求

在编写分布式训练程序之前,用户需要确保已经安装paddlepaddle-2.0.0-rc-cpu或paddlepaddle-2.0.0-rc-gpu及以上版本的飞桨开源框架。

2.2 操作方法

参数服务器训练的基本代码主要包括如下几个部分:

  1. 导入分布式训练需要的依赖包。

  2. 定义分布式模式并初始化分布式训练环境。

  3. 加载模型及数据。

  4. 定义参数更新策略及优化器。

  5. 开始训练。

下面将逐一进行讲解。

2.2.1 导入依赖

导入必要的依赖,例如分布式训练专用的Fleet API(paddle.distributed.fleet)。

  1. import paddle
  2. import paddle.distributed.fleet as fleet

2.2.2 定义分布式模式并初始化分布式训练环境

通过 fleet.init() 接口,用户可以定义训练相关的环境,注意此环境是用户预先在环境变量中配置好的,包括:训练节点个数,服务节点个数,当前节点的序号,服务节点完整的IP:PORT列表等。

  1. # 当前参数服务器模式只支持静态图模式, 因此训练前必须指定 ``paddle.enable_static()``
  2. paddle.enable_static()
  3. fleet.init(is_collective=False)

2.2.3 加载模型及数据

  1. # 模型定义参考 examples/wide_and_deep 中 model.py
  2. from model import WideDeepModel
  3. from reader import WideDeepDataset
  4. model = WideDeepModel()
  5. model.net(is_train=True)
  6. def distributed_training(exe, train_model, train_data_path="./data", batch_size=10, epoch_num=1):
  7. train_data = WideDeepDataset(data_path=train_data_path)
  8. reader = train_model.loader.set_sample_generator(
  9. train_data, batch_size=batch_size, drop_last=True, places=paddle.CPUPlace())
  10. for epoch_id in range(epoch_num):
  11. reader.start()
  12. try:
  13. while True:
  14. loss_val = exe.run(program=paddle.static.default_main_program(),
  15. fetch_list=[train_model.cost.name])
  16. loss_val = np.mean(loss_val)
  17. print("TRAIN ---> pass: {} loss: {}n".format(epoch_id, loss_val))
  18. except paddle.common_ops_import.core.EOFException:
  19. reader.reset()

2.2.4 定义同步训练 Strategy 及 Optimizer

在Fleet API中,用户可以使用 fleet.DistributedStrategy() 接口定义自己想要使用的分布式策略。

其中 a_sync 选项用于定义参数服务器相关的策略,当其被设定为 False 时,分布式训练将在同步的模式下进行。反之,当其被设定成 True 时,分布式训练将在异步的模式下进行。

  1. # 定义异步训练
  2. dist_strategy = fleet.DistributedStrategy()
  3. dist_strategy.a_sync = True
  4. # 定义同步训练
  5. dist_strategy = fleet.DistributedStrategy()
  6. dist_strategy.a_sync = False
  7. # 定义Geo异步训练, Geo异步目前只支持SGD优化算法
  8. dist_strategy = fleet.DistributedStrategy()
  9. dist_strategy.a_sync = True
  10. dist_strategy.a_sync_configs = {"k_steps": 100}
  11. optimizer = paddle.optimizer.SGD(learning_rate=0.0001)
  12. optimizer = fleet.distributed_optimizer(optimizer, dist_strategy)
  13. optimizer.minimize(model.loss)

2.2.5 开始训练

完成模型及训练策略以后,我们就可以开始训练模型了。因为在参数服务器模式下会有不同的角色,所以根据不同节点分配不同的任务。

对于服务器节点,首先用 init_server() 接口对其进行初始化,然后启动服务并开始监听由训练节点传来的梯度。

同样对于训练节点,用 init_worker() 接口进行初始化后, 开始执行训练任务。运行 exe.run() 接口开始训练,并得到训练中每一步的损失值。

  1. if fleet.is_server():
  2. fleet.init_server()
  3. fleet.run_server()
  4. else:
  5. exe = paddle.static.Executor(paddle.CPUPlace())
  6. exe.run(paddle.static.default_startup_program())
  7. fleet.init_worker()
  8. distributed_training(exe, model)
  9. fleet.stop_worker()

2.3 运行训练脚本

定义完训练脚本后,我们就可以用 python3 -m paddle.distributed.launch 指令运行分布式任务了。其中 server_num , worker_num 分别为服务节点和训练节点的数量。在本例中,服务节点有1个,训练节点有2个。

  1. python3 -m paddle.distributed.launch --server_num=1 --worker_num=2 --gpus=0,1 train.py

您将看到显示如下日志信息:

  1. ----------- Configuration Arguments -----------
  2. gpus: 0,1
  3. heter_worker_num: None
  4. heter_workers:
  5. http_port: None
  6. ips: 127.0.0.1
  7. log_dir: log
  8. nproc_per_node: None
  9. server_num: 1
  10. servers:
  11. training_script: train.py
  12. training_script_args: []
  13. worker_num: 2
  14. workers:
  15. ------------------------------------------------
  16. INFO 2021-05-06 12:14:26,890 launch.py:298] Run parameter-sever mode. pserver arguments:['--worker_num', '--server_num'], cuda count:8
  17. INFO 2021-05-06 12:14:26,892 launch_utils.py:973] Local server start 1 processes. First process distributed environment info (Only For Debug):
  18. +=======================================================================================+
  19. | Distributed Envs Value |
  20. +---------------------------------------------------------------------------------------+
  21. | PADDLE_TRAINERS_NUM 2 |
  22. | TRAINING_ROLE PSERVER |
  23. | POD_IP 127.0.0.1 |
  24. | PADDLE_GLOO_RENDEZVOUS 3 |
  25. | PADDLE_PSERVERS_IP_PORT_LIST 127.0.0.1:34008 |
  26. | PADDLE_PORT 34008 |
  27. | PADDLE_WITH_GLOO 0 |
  28. | PADDLE_HETER_TRAINER_IP_PORT_LIST |
  29. | PADDLE_TRAINER_ENDPOINTS 127.0.0.1:18913,127.0.0.1:10025 |
  30. | PADDLE_GLOO_HTTP_ENDPOINT 127.0.0.1:23053 |
  31. | PADDLE_GLOO_FS_PATH /tmp/tmp8vqb8arq |
  32. +=======================================================================================+
  33. INFO 2021-05-06 12:14:26,902 launch_utils.py:1041] Local worker start 2 processes. First process distributed environment info (Only For Debug):
  34. +=======================================================================================+
  35. | Distributed Envs Value |
  36. +---------------------------------------------------------------------------------------+
  37. | PADDLE_GLOO_HTTP_ENDPOINT 127.0.0.1:23053 |
  38. | PADDLE_GLOO_RENDEZVOUS 3 |
  39. | PADDLE_PSERVERS_IP_PORT_LIST 127.0.0.1:34008 |
  40. | PADDLE_WITH_GLOO 0 |
  41. | PADDLE_TRAINER_ENDPOINTS 127.0.0.1:18913,127.0.0.1:10025 |
  42. | FLAGS_selected_gpus 0 |
  43. | PADDLE_GLOO_FS_PATH /tmp/tmp8vqb8arq |
  44. | PADDLE_TRAINERS_NUM 2 |
  45. | TRAINING_ROLE TRAINER |
  46. | XPU_VISIBLE_DEVICES 0 |
  47. | PADDLE_HETER_TRAINER_IP_PORT_LIST |
  48. | PADDLE_TRAINER_ID 0 |
  49. | CUDA_VISIBLE_DEVICES 0 |
  50. | FLAGS_selected_xpus 0 |
  51. +=======================================================================================+
  52. INFO 2021-05-06 12:14:26,921 launch_utils.py:903] Please check servers, workers and heter_worker logs in log/workerlog.*, log/serverlog.* and log/heterlog.*
  53. INFO 2021-05-06 12:14:33,446 launch_utils.py:914] all workers exit, going to finish parameter server and heter_worker.
  54. INFO 2021-05-06 12:14:33,446 launch_utils.py:926] all parameter server are killed