概述

从前几节的训练看,无论是房价预测任务还是MNIST手写字数字识别任务,训练好一个模型不会超过十分钟,主要原因是我们所使用的神经网络比较简单。但实际应用时,常会遇到更加复杂的机器学习或深度学习任务,需要运算速度更高的硬件(如GPU、NPU),甚至同时使用多个机器共同训练一个任务(多卡训练和多机训练)。本节我们依旧横向展开"横纵式"教学方法,如 图1 所示,探讨在手写数字识别任务中,通过资源配置的优化,提升模型训练效率的方法。

【手写数字识别】之资源配置 - 图1

图1:“横纵式”教学法 — 资源配置

前提条件

在优化算法之前,需要进行数据处理、设计神经网络结构,代码与上一节保持一致,如下所示。如果读者已经掌握了这部分内容,可以直接阅读正文部分。

  1. # 加载相关库
  2. import os
  3. import random
  4. import paddle
  5. import paddle.fluid as fluid
  6. from paddle.fluid.dygraph.nn import Conv2D, Pool2D, Linear
  7. import numpy as np
  8. from PIL import Image
  9. import gzip
  10. import json
  11. # 定义数据集读取器
  12. def load_data(mode='train'):
  13. # 读取数据文件
  14. datafile = './work/mnist.json.gz'
  15. print('loading mnist dataset from {} ......'.format(datafile))
  16. data = json.load(gzip.open(datafile))
  17. # 读取数据集中的训练集,验证集和测试集
  18. train_set, val_set, eval_set = data
  19. # 数据集相关参数,图片高度IMG_ROWS, 图片宽度IMG_COLS
  20. IMG_ROWS = 28
  21. IMG_COLS = 28
  22. # 根据输入mode参数决定使用训练集,验证集还是测试
  23. if mode == 'train':
  24. imgs = train_set[0]
  25. labels = train_set[1]
  26. elif mode == 'valid':
  27. imgs = val_set[0]
  28. labels = val_set[1]
  29. elif mode == 'eval':
  30. imgs = eval_set[0]
  31. labels = eval_set[1]
  32. # 获得所有图像的数量
  33. imgs_length = len(imgs)
  34. # 验证图像数量和标签数量是否一致
  35. assert len(imgs) == len(labels), \
  36. "length of train_imgs({}) should be the same as train_labels({})".format(
  37. len(imgs), len(labels))
  38. index_list = list(range(imgs_length))
  39. # 读入数据时用到的batchsize
  40. BATCHSIZE = 100
  41. # 定义数据生成器
  42. def data_generator():
  43. # 训练模式下,打乱训练数据
  44. if mode == 'train':
  45. random.shuffle(index_list)
  46. imgs_list = []
  47. labels_list = []
  48. # 按照索引读取数据
  49. for i in index_list:
  50. # 读取图像和标签,转换其尺寸和类型
  51. img = np.reshape(imgs[i], [1, IMG_ROWS, IMG_COLS]).astype('float32')
  52. label = np.reshape(labels[i], [1]).astype('int64')
  53. imgs_list.append(img)
  54. labels_list.append(label)
  55. # 如果当前数据缓存达到了batch size,就返回一个批次数据
  56. if len(imgs_list) == BATCHSIZE:
  57. yield np.array(imgs_list), np.array(labels_list)
  58. # 清空数据缓存列表
  59. imgs_list = []
  60. labels_list = []
  61. # 如果剩余数据的数目小于BATCHSIZE,
  62. # 则剩余数据一起构成一个大小为len(imgs_list)的mini-batch
  63. if len(imgs_list) > 0:
  64. yield np.array(imgs_list), np.array(labels_list)
  65. return data_generator
  66. # 定义模型结构
  67. class MNIST(fluid.dygraph.Layer):
  68. def __init__(self, name_scope):
  69. super(MNIST, self).__init__(name_scope)
  70. name_scope = self.full_name()
  71. # 定义一个卷积层,使用relu激活函数
  72. self.conv1 = Conv2D(num_channels=1, num_filters=20, filter_size=5, stride=1, padding=2, act='relu')
  73. # 定义一个池化层,池化核为2,步长为2,使用最大池化方式
  74. self.pool1 = Pool2D(pool_size=2, pool_stride=2, pool_type='max')
  75. # 定义一个卷积层,使用relu激活函数
  76. self.conv2 = Conv2D(num_channels=20, num_filters=20, filter_size=5, stride=1, padding=2, act='relu')
  77. # 定义一个池化层,池化核为2,步长为2,使用最大池化方式
  78. self.pool2 = Pool2D(pool_size=2, pool_stride=2, pool_type='max')
  79. # 定义一个全连接层,输出节点数为10
  80. self.fc = Linear(input_dim=980, output_dim=10, act='softmax')
  81. # 定义网络的前向计算过程
  82. def forward(self, inputs):
  83. x = self.conv1(inputs)
  84. x = self.pool1(x)
  85. x = self.conv2(x)
  86. x = self.pool2(x)
  87. x = fluid.layers.reshape(x, [x.shape[0], 980])
  88. x = self.fc(x)
  89. return x

单GPU训练

飞桨动态图通过fluid.dygraph.guard(place=None)里的place参数,设置在GPU上训练还是CPU上训练。

  1. with fluid.dygraph.guard(place=fluid.CPUPlace()) #设置使用CPU资源训神经网络。
  2. with fluid.dygraph.guard(place=fluid.CUDAPlace(0)) #设置使用GPU资源训神经网络,默认使用服务器的第一个GPU卡。"0"是GPU卡的编号,比如一台服务器有的四个GPU卡,编号分别为0、1、2、3。
  1. #仅前3行代码有所变化,在使用GPU时,可以将use_gpu变量设置成True
  2. use_gpu = False
  3. place = fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace()
  4. with fluid.dygraph.guard(place):
  5. model = MNIST("mnist")
  6. model.train()
  7. #调用加载数据的函数
  8. train_loader = load_data('train')
  9. #四种优化算法的设置方案,可以逐一尝试效果
  10. optimizer = fluid.optimizer.SGDOptimizer(learning_rate=0.01, parameter_list=model.parameters())
  11. #optimizer = fluid.optimizer.MomentumOptimizer(learning_rate=0.01, momentum=0.9, parameter_list=model.parameters())
  12. #optimizer = fluid.optimizer.AdagradOptimizer(learning_rate=0.01, parameter_list=model.parameters())
  13. #optimizer = fluid.optimizer.AdamOptimizer(learning_rate=0.01, parameter_list=model.parameters())
  14. EPOCH_NUM = 2
  15. for epoch_id in range(EPOCH_NUM):
  16. for batch_id, data in enumerate(train_loader()):
  17. #准备数据,变得更加简洁
  18. image_data, label_data = data
  19. image = fluid.dygraph.to_variable(image_data)
  20. label = fluid.dygraph.to_variable(label_data)
  21. #前向计算的过程
  22. predict = model(image)
  23. #计算损失,取一个批次样本损失的平均值
  24. loss = fluid.layers.cross_entropy(predict, label)
  25. avg_loss = fluid.layers.mean(loss)
  26. #每训练了100批次的数据,打印下当前Loss的情况
  27. if batch_id % 200 == 0:
  28. print("epoch: {}, batch: {}, loss is: {}".format(epoch_id, batch_id, avg_loss.numpy()))
  29. #后向传播,更新参数的过程
  30. avg_loss.backward()
  31. optimizer.minimize(avg_loss)
  32. model.clear_gradients()
  33. #保存模型参数
  34. fluid.save_dygraph(model.state_dict(), 'mnist')
  1. loading mnist dataset from ./work/mnist.json.gz ......
  2. epoch: 0, batch: 0, loss is: [2.7262428]

分布式训练

在工业实践中,很多较复杂的任务需要使用更强大的模型。强大模型加上海量的训练数据,经常导致模型训练耗时严重。比如在计算机视觉分类任务中,训练一个在ImageNet数据集上精度表现良好的模型,大概需要一周的时间,因为过程中我们需要不断尝试各种优化的思路和方案。如果每次训练均要耗时1周,这会大大降低模型迭代的速度。在机器资源充沛的情况下,建议采用分布式训练,大部分模型的训练时间可压缩到小时级别。

分布式训练有两种实现模式:模型并行和数据并行。

模型并行

模型并行是将一个网络模型拆分为多份,拆分后的模型分到多个设备上(GPU)训练,每个设备的训练数据是相同的。模型并行的实现模式可以节省内存,但是应用较为受限。

模型并行的方式一般适用于如下两个场景:

  • 模型架构过大: 完整的模型无法放入单个GPU。如2012年ImageNet大赛的冠军模型AlexNet是模型并行的典型案例,由于当时GPU内存较小,单个GPU不足以承担AlexNet,因此研究者将AlexNet拆分为两部分放到两个GPU上并行训练。

  • 网络模型的结构设计相对独立: 当网络模型的设计结构可以并行化时,采用模型并行的方式。如在计算机视觉目标检测任务中,一些模型(如YOLO9000)的边界框回归和类别预测是独立的,可以将独立的部分放到不同的设备节点上完成分布式训练。

数据并行

数据并行与模型并行不同,数据并行每次读取多份数据,读取到的数据输入给多个设备(GPU)上的模型,每个设备上的模型是完全相同的,飞桨采用的就是这种方式。


说明:

当前GPU硬件技术快速发展,深度学习使用的主流GPU的内存已经足以满足大多数的网络模型需求,所以大多数情况下使用数据并行的方式。


数据并行的方式与众人拾柴火焰高的道理类似,如果把训练数据比喻为砖头,把一个设备(GPU)比喻为一个人,那单GPU训练就是一个人在搬砖,多GPU训练就是多个人同时搬砖,每次搬砖的数量倍数增加,效率呈倍数提升。值得注意的是,每个设备的模型是完全相同的,但是输入数据不同,因此每个设备的模型计算出的梯度是不同的。如果每个设备的梯度只更新当前设备的模型,就会导致下次训练时,每个模型的参数都不相同。因此我们还需要一个梯度同步机制,保证每个设备的梯度是完全相同的。

梯度同步有两种方式:PRC通信方式和NCCL2通信方式(Nvidia Collective multi-GPU Communication Library)。

PRC通信方式

PRC通信方式通常用于CPU分布式训练,它有两个节点:参数服务器Parameter server和训练节点Trainer,结构如 图2 所示。

【手写数字识别】之资源配置 - 图2

图2:Pserver通信方式的结构

parameter server收集来自每个设备的梯度更新信息,并计算出一个全局的梯度更新。Trainer用于训练,每个Trainer上的程序相同,但数据不同。当Parameter server收到来Trainer的梯度更新请求时,统一更新模型的梯度。

NCCL2通信方式(Collective)

当前飞桨的GPU分布式训练使用的是基于NCCL2的通信方式,结构如 图3 所示。

【手写数字识别】之资源配置 - 图3

图3:NCCL2通信方式的结构

相比PRC通信方式,使用NCCL2(Collective通信方式)进行分布式训练,不需要启动Pserver进程,每个Trainer进程保存一份完整的模型参数,在完成梯度计算之后通过Trainer之间的相互通信,Reduce梯度数据到所有节点的所有设备,然后每个节点在各自完成参数更新。

飞桨提供了便利的数据并行训练方式,用户只需要对程序进行简单修改,即可实现在多GPU上并行训练。接下来将讲述如何将一个单机程序通过简单的改造,变成多机多卡程序。

在启动训练前,需要配置如下参数:

  • 从环境变量获取设备的ID,并指定给CUDAPlace。
  1. device_id = fluid.dygraph.parallel.Env().dev_id
  2. place = fluid.CUDAPlace(device_id)
  • 对定义的网络做预处理,设置为并行模式。
  1. strategy = fluid.dygraph.parallel.prepare_context() ## 新增
  2. model = MNIST("mnist")
  3. model = fluid.dygraph.parallel.DataParallel(model, strategy) ## 新增
  • 定义多GPU训练的reader,不同ID的GPU加载不同的数据集。
  1. valid_loader = paddle.batch(paddle.dataset.mnist.test(), batch_size=16, drop_last=true)
  2. valid_loader = fluid.contrib.reader.distributed_batch_reader(valid_loader)
  • 收集每批次训练数据的loss,并聚合参数的梯度。
  1. avg_loss = mnist.scale_loss(avg_loss) ## 新增
  2. avg_loss.backward()
  3. mnist.apply_collective_grads() ## 新增

完整程序如下所示。

  1. def train_multi_gpu():
  2. ##修改1-从环境变量获取使用GPU的序号
  3. place = fluid.CUDAPlace(fluid.dygraph.parallel.Env().dev_id)
  4. with fluid.dygraph.guard(place):
  5. ##修改2-对原模型做并行化预处理
  6. strategy = fluid.dygraph.parallel.prepare_context()
  7. model = MNIST("mnist")
  8. model = fluid.dygraph.parallel.DataParallel(model, strategy)
  9. model.train()
  10. #调用加载数据的函数
  11. train_loader = load_data('train')
  12. ##修改3-多GPU数据读取,必须确保每个进程读取的数据是不同的
  13. train_loader = fluid.contrib.reader.distributed_batch_reader(train_loader)
  14. optimizer = fluid.optimizer.SGDOptimizer(learning_rate=0.01, parameter_list=model.parameters())
  15. EPOCH_NUM = 5
  16. for epoch_id in range(EPOCH_NUM):
  17. for batch_id, data in enumerate(train_loader()):
  18. #准备数据
  19. image_data, label_data = data
  20. image = fluid.dygraph.to_variable(image_data)
  21. label = fluid.dygraph.to_variable(label_data)
  22. predict = model(image)
  23. loss = fluid.layers.square_error_cost(predict, label)
  24. avg_loss = fluid.layers.mean(loss)
  25. # 修改4-多GPU训练需要对Loss做出调整,并聚合不同设备上的参数梯度
  26. avg_loss = mnist.scale_loss(avg_loss)
  27. avg_loss.backward()
  28. model.apply_collective_grads()
  29. # 最小化损失函数,清除本次训练的梯度
  30. optimizer.minimize(avg_loss)
  31. model.clear_gradients()
  32. if batch_id % 200 == 0:
  33. print("epoch: {}, batch: {}, loss is: {}".format(epoch_id, batch_id, avg_loss.numpy()))
  34. #保存模型参数
  35. fluid.save_dygraph(model.state_dict(), 'mnist')

启动多GPU的训练,还需要在命令行中设置一些参数变量。打开终端,运行如下命令:

  1. $ python -m paddle.distributed.launch --selected_gpus=0,1,2,3 --log_dir ./mylog train_multi_gpu.py
  • paddle.distributed.launch:启动分布式运行。
  • selected_gpus:设置使用的GPU的序号(需要是多GPU卡的机器,通过命令watch nvidia-smi查看GPU的序号)。
  • log_dir:存放训练的log,若不设置,每个GPU上的训练信息都会打印到屏幕。
  • train_multi_gpu.py:多GPU训练的程序,包含修改过的train_multi_gpu()函数。

说明:

AI Studio当前仅支持单卡GPU,因此本案例需要在本地GPU上执行,无法在AI Studio上演示。


训练完成后,在指定的./mylog文件夹下会产生四个日志文件,其中worklog.0的内容如下:

  1. grep: warning: GREP_OPTIONS is deprecated; please use an alias or script
  2. dev_id 0
  3. I1104 06:25:04.377323 31961 nccl_context.cc:88] worker: 127.0.0.1:6171 is not ready, will retry after 3 seconds...
  4. I1104 06:25:07.377645 31961 nccl_context.cc:127] init nccl context nranks: 3 local rank: 0 gpu id: 1
  5. W1104 06:25:09.097079 31961 device_context.cc:235] Please NOTE: device: 1, CUDA Capability: 61, Driver API Version: 10.1, Runtime API Version: 9.0
  6. W1104 06:25:09.104460 31961 device_context.cc:243] device: 1, cuDNN Version: 7.5.
  7. start data reader (trainers_num: 3, trainer_id: 0)
  8. epoch: 0, batch_id: 10, loss is: [0.47507238]
  9. epoch: 0, batch_id: 20, loss is: [0.25089613]
  10. epoch: 0, batch_id: 30, loss is: [0.13120805]
  11. epoch: 0, batch_id: 40, loss is: [0.12122715]
  12. epoch: 0, batch_id: 50, loss is: [0.07328521]
  13. epoch: 0, batch_id: 60, loss is: [0.11860339]
  14. epoch: 0, batch_id: 70, loss is: [0.08205047]
  15. epoch: 0, batch_id: 80, loss is: [0.08192863]
  16. epoch: 0, batch_id: 90, loss is: [0.0736289]
  17. epoch: 0, batch_id: 100, loss is: [0.08607423]
  18. start data reader (trainers_num: 3, trainer_id: 0)
  19. epoch: 1, batch_id: 10, loss is: [0.07032011]
  20. epoch: 1, batch_id: 20, loss is: [0.09687119]
  21. epoch: 1, batch_id: 30, loss is: [0.0307216]
  22. epoch: 1, batch_id: 40, loss is: [0.03884467]
  23. epoch: 1, batch_id: 50, loss is: [0.02801813]
  24. epoch: 1, batch_id: 60, loss is: [0.05751991]
  25. epoch: 1, batch_id: 70, loss is: [0.03721186]
  26. .....