Pytorch 分布式训练
group
进程组。默认情况只有一个组,一个 job 为一个组,也为一个 world
world size
全局进程个数
rank
表示进程序号,用于进程间的通讯。rank=0 的主机为 master 节点
local rank
进程内 GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。 rank=3, local_rank=0 表示第 3 个进程内的第 1 块 GPU。
具体操作
首先需要进行一些参数的设置
import argparse
parser = argparse.ArgumentParser(description='PyTorch distributed training')
parser.add_argument("--local_rank", type=int, default=0)
parser.add_argument("--dist", type=bool, default=True)
parser.add_argument("--gpu_ids", type=list, default=[0,1,2,3])
args = parser.parse_args()
- local_rank 是为了 torch.distributed.launch 内部指定
- dist 是否使用分布式训练
- gpu_ids 指定使用 GPU 的编号
初始化分布式
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def init_dist(backend="nccl", **kwargs):
""" initialization for distributed training"""
if (
mp.get_start_method(allow_none=True) != "spawn"
): # Return the name of start method used for starting processes
mp.set_start_method("spawn", force=True) #'spawn' is the default on Windows
rank = int(os.environ['RANK']) # system env process ranks
num_gpus = torch.cuda.device_count() # Returns the number of GPUs available
torch.cuda.set_device(rank % num_gpus)
dist.init_process_group(
backend=backend, **kwargs
) # Initializes the default distributed process group
if args.dist:
init_dist()
world_size = (
torch.distributed.get_world_size()
) # Returns the number of processes in the current process group
rank = torch.distributed.get_rank() # Returns the rank of current process group
else:
rank = -1
torch.backends.cudnn.benchmark = True
需要进行修改的地方
- 需要记录日志的地方
if rank <= 0:
logger.info('Something need to log')
- 数据加载的地方
import math
from torch.utils.data import DataLoader
dataset_ratio = 200
if train:
train_set = define_Dataset(train_dataset)
train_size = int(math.ceil(len(train_set) / batch_size))
total_epochs = int(math.ceil(total_iters / train_size))
if args.dist:
world_size = torch.distributed.get_world_size()
assert batch_size % world_size == 0
batch_size = batch_size // world_size
train_sampler = DistIterSampler(
train_set, world_size, rank, dataset_ratio
)
total_epochs = int(math.ceil(total_iters / (train_size * dataset_ratio)))
train_loader = DataLoader(train_set,
batch_size=batch_size,
shuffle=False,
num_workers=num_workers,
drop_last=True,
pin_memory=True,
sampler=train_sampler)
else:
train_loader = DataLoader(train_set,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
drop_last=True,
pin_memory=True)
else:
test_set = define_Dataset(test_dataset)
test_loader = DataLoader(test_set, batch_size=1,
shuffle=False, num_workers=1,
drop_last=False, pin_memory=True)
需要注意的是
♠ world_size 可以理解为 GPU 的数量,需要保证 batch_size 能整除 world_size 即把原本一个 batch 分给几个 GPU
♣ 使用分布式训练时 DataLoader 中 shuffle 需要为 False
♥ 测试时是使用单 GPU 的
♦ 分布式需要指定 sampler
DistIterSampler 的代码如下:
"""
Modified from torch.utils.data.distributed.DistributedSampler
Support enlarging the dataset for *iter-oriented* training, for saving time when restart the
dataloader after each epoch
"""
import math
import torch
import torch.distributed as dist
from torch.utils.data.sampler import Sampler
class DistIterSampler(Sampler):
"""Sampler that restricts data loading to a subset of the dataset.
It is especially useful in conjunction with
:class:`torch.nn.parallel.DistributedDataParallel`. In such case, each
process can pass a DistributedSampler instance as a DataLoader sampler,
and load a subset of the original dataset that is exclusive to it.
.. note::
Dataset is assumed to be of constant size.
Arguments:
dataset: Dataset used for sampling.
num_replicas (optional): Number of processes participating in
distributed training.
rank (optional): Rank of the current process within num_replicas.
"""
def __init__(self, dataset, num_replicas=None, rank=None, ratio=100):
if num_replicas is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
num_replicas = dist.get_world_size()
if rank is None:
if not dist.is_available():
raise RuntimeError("Requires distributed package to be available")
rank = dist.get_rank()
self.dataset = dataset
self.num_replicas = num_replicas
self.rank = rank
self.epoch = 0
self.num_samples = int(math.ceil(len(self.dataset) * ratio / self.num_replicas))
self.total_size = self.num_samples * self.num_replicas
def __iter__(self):
# deterministically shuffle based on epoch
g = torch.Generator()
g.manual_seed(self.epoch)
indices = torch.randperm(
self.total_size, generator=g
).tolist() # Returns a random permutation of integers from 0 to n - 1
dsize = len(self.dataset)
indices = [v % dsize for v in indices]
# subsample
indices = indices[self.rank : self.total_size : self.num_replicas]
assert len(indices) == self.num_samples
return iter(indices)
def __len__(self):
return self.num_samples
def set_epoch(self, epoch):
self.epoch = epoch
- 模型的设置
from torch.nn.parallel import DataParallel, DistributedDataParallel
device = torch.device('cuda' if opt['gpu_ids'] is not None else 'cpu')
net = define_net().to(self.device)
if args.dist:
rank = torch.distributed.get_rank()
net = DistributedDataParallel(self.netG, device_ids=[torch.cuda.current_device()])
else:
rank = -1 # non dist training
net = DataParallel(self.netG)
input = input.to(f'cuda:{net.device_ids[0]}')
if isinstance(network, nn.DataParallel) or isinstance(network, DistributedDataParallel):
network = network.module
state_dict = network.state_dict()
for key, param in state_dict.items():
state_dict[key] = param.cpu()
torch.save(state_dict, save_path)
因此需要在模型的定义、加载、保存以及输入指定 GPU 需要修改。
- 脚本命令
CUDA_VISIBLE_DEVICES=0,1,2,3 python3 -m torch.distributed.launch --nproc_per_node=4 --master_port=3210 train.py
参数说明
♠ CUDA_VISIBLE_DEVICES 指定 GPU 的编号
♣ nproc_per_node 参数指定为当前主机创建的进程数。一般设定为当前主机的 GPU 数量
♥ master_port 分别指定 master 节点的 ip:port
其余就是哪错调哪了。