Pytorch 分布式训练

  • group

    进程组。默认情况只有一个组,一个 job 为一个组,也为一个 world

  • world size

    全局进程个数

  • rank

    表示进程序号,用于进程间的通讯。rank=0 的主机为 master 节点

  • local rank

    进程内 GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。 rank=3, local_rank=0 表示第 3 个进程内的第 1 块 GPU。

具体操作

首先需要进行一些参数的设置

    import argparse

    parser = argparse.ArgumentParser(description='PyTorch distributed training')
    parser.add_argument("--local_rank", type=int, default=0)
    parser.add_argument("--dist", type=bool, default=True)
    parser.add_argument("--gpu_ids", type=list, default=[0,1,2,3])
    args = parser.parse_args()
  • local_rank 是为了 torch.distributed.launch 内部指定
  • dist 是否使用分布式训练
  • gpu_ids 指定使用 GPU 的编号

初始化分布式


import os

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

def init_dist(backend="nccl", **kwargs):
    """ initialization for distributed training"""
 
    if (
        mp.get_start_method(allow_none=True) != "spawn"
    ):  # Return the name of start method used for starting processes
        mp.set_start_method("spawn", force=True)  #'spawn' is the default on Windows

    rank = int(os.environ['RANK'])                # system env process ranks
    num_gpus = torch.cuda.device_count()          # Returns the number of GPUs available
    torch.cuda.set_device(rank % num_gpus)
    dist.init_process_group(
        backend=backend, **kwargs
    )                                             # Initializes the default distributed process group

if args.dist:
    init_dist()
    world_size = (
        torch.distributed.get_world_size()
    )                                             # Returns the number of processes in the current process group
    rank = torch.distributed.get_rank()           # Returns the rank of current process group

else:
    rank = -1

torch.backends.cudnn.benchmark = True

需要进行修改的地方

  • 需要记录日志的地方

if rank <= 0:
    logger.info('Something need to log')
  • 数据加载的地方

import math

from torch.utils.data import DataLoader

dataset_ratio = 200

if train:
    train_set = define_Dataset(train_dataset)
    train_size = int(math.ceil(len(train_set) / batch_size))
    total_epochs = int(math.ceil(total_iters / train_size))

    if args.dist:
        world_size = torch.distributed.get_world_size()
        assert batch_size % world_size == 0
        batch_size = batch_size // world_size

        train_sampler = DistIterSampler(
            train_set, world_size, rank, dataset_ratio
        )
        
        total_epochs = int(math.ceil(total_iters / (train_size * dataset_ratio)))

        train_loader = DataLoader(train_set,
                                  batch_size=batch_size,
                                  shuffle=False,
                                  num_workers=num_workers,
                                  drop_last=True,
                                  pin_memory=True,
                                  sampler=train_sampler)
    else:
        train_loader = DataLoader(train_set,
                                  batch_size=batch_size,
                                  shuffle=True,
                                  num_workers=num_workers,
                                  drop_last=True,
                                  pin_memory=True)

else:
    test_set = define_Dataset(test_dataset)
    test_loader = DataLoader(test_set, batch_size=1,
                             shuffle=False, num_workers=1,
                             drop_last=False, pin_memory=True)

需要注意的是

♠ world_size 可以理解为 GPU 的数量,需要保证 batch_size 能整除 world_size 即把原本一个 batch 分给几个 GPU

♣ 使用分布式训练时 DataLoader 中 shuffle 需要为 False

♥ 测试时是使用单 GPU 的

♦ 分布式需要指定 sampler

DistIterSampler 的代码如下:


"""
Modified from torch.utils.data.distributed.DistributedSampler
Support enlarging the dataset for *iter-oriented* training, for saving time when restart the
dataloader after each epoch
"""
import math

import torch
import torch.distributed as dist
from torch.utils.data.sampler import Sampler


class DistIterSampler(Sampler):
    """Sampler that restricts data loading to a subset of the dataset.

    It is especially useful in conjunction with
    :class:`torch.nn.parallel.DistributedDataParallel`. In such case, each
    process can pass a DistributedSampler instance as a DataLoader sampler,
    and load a subset of the original dataset that is exclusive to it.

    .. note::
        Dataset is assumed to be of constant size.

    Arguments:
        dataset: Dataset used for sampling.
        num_replicas (optional): Number of processes participating in
            distributed training.
        rank (optional): Rank of the current process within num_replicas.
    """

    def __init__(self, dataset, num_replicas=None, rank=None, ratio=100):
        if num_replicas is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            num_replicas = dist.get_world_size()
        if rank is None:
            if not dist.is_available():
                raise RuntimeError("Requires distributed package to be available")
            rank = dist.get_rank()
        self.dataset = dataset
        self.num_replicas = num_replicas
        self.rank = rank
        self.epoch = 0
        self.num_samples = int(math.ceil(len(self.dataset) * ratio / self.num_replicas))
        self.total_size = self.num_samples * self.num_replicas

    def __iter__(self):
        # deterministically shuffle based on epoch
        g = torch.Generator()
        g.manual_seed(self.epoch)
        indices = torch.randperm(
            self.total_size, generator=g
        ).tolist()  # Returns a random permutation of integers from 0 to n - 1

        dsize = len(self.dataset)
        indices = [v % dsize for v in indices]

        # subsample
        indices = indices[self.rank : self.total_size : self.num_replicas]
        assert len(indices) == self.num_samples

        return iter(indices)

    def __len__(self):
        return self.num_samples

    def set_epoch(self, epoch):
        self.epoch = epoch
  • 模型的设置

from torch.nn.parallel import DataParallel, DistributedDataParallel

device = torch.device('cuda' if opt['gpu_ids'] is not None else 'cpu')
net = define_net().to(self.device)
 
if args.dist:
    rank = torch.distributed.get_rank()
    net = DistributedDataParallel(self.netG, device_ids=[torch.cuda.current_device()])
else:
    rank = -1                    # non dist training
    net = DataParallel(self.netG)


input = input.to(f'cuda:{net.device_ids[0]}')

if isinstance(network, nn.DataParallel) or isinstance(network, DistributedDataParallel):
    network = network.module
state_dict = network.state_dict()

for key, param in state_dict.items():
    state_dict[key] = param.cpu()

torch.save(state_dict, save_path)

因此需要在模型的定义、加载、保存以及输入指定 GPU 需要修改。

  • 脚本命令
CUDA_VISIBLE_DEVICES=0,1,2,3 python3 -m torch.distributed.launch --nproc_per_node=4 --master_port=3210 train.py

参数说明

♠ CUDA_VISIBLE_DEVICES 指定 GPU 的编号

♣ nproc_per_node 参数指定为当前主机创建的进程数。一般设定为当前主机的 GPU 数量

♥ master_port 分别指定 master 节点的 ip:port

其余就是哪错调哪了。