PIPELINING PARALLELISM

We have introduced the characteristics of pipelining parallelism in COMMON DISTRIBUTED PARALLEL STRATEGIES.

From OneFlow’s consistent view, pipelining can be achieved by simply setting the placement attribute of Tensor.

The following code is a simple example that will run the network in QUICKSTART with pipelining parallelism. nn.Flatten, nn.Linear(28*28, 512) and nn.ReLU() run on GPU0, and the rest layers of the network run on GPU1.

Code

  1. import oneflow as flow

BATCH_SIZE = 16 BROADCAST = [flow.sbp.broadcast] P0 = flow.placement(“cuda”, {0: [0]}) P1 = flow.placement(“cuda”, {0: [1]})

class Stage0Module(flow.nn.Module): def init(self): super().init() self.flatten = flow.nn.Flatten() self.linear0 = flow.nn.Linear(28*28, 512) self.relu0 = flow.nn.ReLU()

  1. def forward(self, x):
  2. out = self.flatten(x)
  3. out = self.linear0(out)
  4. out = self.relu0(out)
  5. return out

class Stage1Module(flow.nn.Module): def init(self): super().init() self.linear1 = flow.nn.Linear(512, 512) self.relu1 = flow.nn.ReLU() self.linear2 = flow.nn.Linear(512, 10) self.relu2 = flow.nn.ReLU()

  1. def forward(self, x):
  2. out = self.linear1(x)
  3. out = self.relu1(out)
  4. out = self.linear2(out)
  5. out = self.relu2(out)
  6. return out

class PipelineModule(flow.nn.Module): def init(self): super().init() self.m_stage0 = Stage0Module() self.m_stage1 = Stage1Module()

  1. self.m_stage0.to_consistent(placement=P0, sbp=BROADCAST)
  2. self.m_stage1.to_consistent(placement=P1, sbp=BROADCAST)
  3. def forward(self, x):
  4. out_stage0 = self.m_stage0(x)
  5. in_stage1 = out_stage0.to_consistent(placement=P1, sbp=BROADCAST)
  6. out_stage1 = self.m_stage1(in_stage1)
  7. return out_stage1

module_pipeline = PipelineModule() sgd = flow.optim.SGD(module_pipeline.parameters(), lr=0.001)

class PipelineGraph(flow.nn.Graph): def init(self): super().init() self.module_pipeline = module_pipeline self.module_pipeline.m_stage0.config.stage_id = 0 self.module_pipeline.m_stage1.config.stage_id = 1 self.loss_fn = flow.nn.CrossEntropyLoss() self.config.set_gradient_accumulation_steps(2) self.add_optimizer(sgd)

  1. def build(self, x, y):
  2. out = self.module_pipeline(x)
  3. loss = self.loss_fn(out, y)
  4. loss.backward()
  5. return loss

graph_pipeline = PipelineGraph()

x = flow.randn(BATCH_SIZE, 1, 28, 28) x = x.to_consistent(P0, BROADCAST) y = flow.randint(0, 10, (BATCH_SIZE,)) y = y.to_consistent(P1, BROADCAST)

for i in range(20): loss = graph_pipeline(x, y) print(loss.to_local()) ```

When the code above is saved as a script (pipeline.py), it can be then launched by the launch module:

  1. python3 -m oneflow.distributed.launch --nproc_per_node 2 ./pipeline.py

More Details

Setting placement and SBP

Setting up the placement and SBP at the begining:

  1. BROADCAST = [flow.sbp.broadcast]
  2. P0 = flow.placement("cuda", {0: [0]})
  3. P1 = flow.placement("cuda", {0: [1]})

P0 and P1 represent the 0th GPU and the 1st GPU on the 0th machine respectively.

By calling nn.Module.to_consistent or Tensor.to_consistent, the model or tensor will be distributed to the devices specified before, breaking a network into stages.

Here we define a PipelineModule that specifically sets the pipeline for each stage.

  1. class PipelineModule(flow.nn.Module):
  2. def __init__(self):
  3. #...
  4. self.m_stage0.to_consistent(placement=P0, sbp=BROADCAST)
  5. self.m_stage1.to_consistent(placement=P1, sbp=BROADCAST)
  6. def forward(self, x):
  7. out_stage0 = self.m_stage0(x)
  8. in_stage1 = out_stage0.to_consistent(placement=P1, sbp=BROADCAST)
  9. out_stage1 = self.m_stage1(in_stage1)
  10. return out_stage1

Transforming the Local Tensor to the Consistent Tensor

The example uses randomly generated data as input.

  1. x = flow.randn(BATCH_SIZE, 1, 28, 28)
  2. x = x.to_consistent(P0, BROADCAST)

The launch will start two processes when you launch the training by the launch module because the command-line parameter is --nproc_per_node 2. Both processes will execute the code in the script.

The statement x = flow.randn(BATCH_SIZE, 1, 28, 28) returns the Local Tensor (the local data only valid in current process). when running x = x.to_consistent(P0, BROADCAST), OneFlow will automatically integrate the Local Tensor of all processes into the Consistent Tensor.

In practice, each computing device can load data locally, and then convert the Local Tensor to the Consistent Tensor via to_consistent.

Stage ID and Settings for Gradient Accumulation

We can set Stage ID by setting the config.stage_id attribute of Module. The Stage ID is numbered starting from 0 and increasing by 1.

Call self.config.set_gradient_accumulation_steps method to set the step size of gradient accumulation.

The information needed to implement micro-batch in pipelining parallelism can be obtained by these two configurations.

  1. self.module_pipeline.m_stage0.config.stage_id = 0
  2. self.module_pipeline.m_stage1.config.stage_id = 1
  3. self.config.set_gradient_accumulation_steps(2)

Please activate JavaScript for write a comment in LiveRe