본문 바로가기

DeepLearning/파이토치(pytorch)

[torch] multi-gpu (DDP)로 사용하기 튜토리얼

728x90
반응형

아래의 코드는 간단하게 autoencoder를 만드는 과정을 적은 것이다.

mp.spawn해봤자 global에 있는 model에는 영향을 안준다. 따라서, 학습 중간에 훈련된 trained-model을

torch.save로저장해야한다.

 

다른 것으로는 과거의 loss변동을 이용하는 옵티마이저들 (ex. ADAM)같은 것들도 저장해서 불러와야한다고 한다.

SGD는 해당사항이 없다. 이 부분에 대해서는 테스트가 필요할 것 같다.

다만, 나의 경우는 데이터가 너무 커서 컴퓨터 메모리에 모두 못 올리는 상황인 것이라 문제가 없지 않을까 한다.

현재 내 상황은 총 데이터는 5TB이고 메모리에 모두 못 올리니 50gb 정도 불러온 후에 이를 배치로 나눠서 훈련하고 있는 상황이다. 그리고 각 gpu에서 돌던 모델들이 동기화 된다는 것까지는 확인했지만 이런 옵티마이저를 불러와서 학습해도 괜찮을지는 모르겠다.

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, Dataset
# 윈도우 플랫폼에서 torch.distributed 패키지는
# Gloo backend, FileStore 및 TcpStore 만을 지원합니다.
# FileStore의 경우, init_process_group 에서
# init_method 매개변수를 로컬 파일로 설정합니다.
# 다음 예시:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# TcpStore의 경우 리눅스와 동일한 방식입니다.
# https://medium.com/tesser-team/%EB%8B%A4%EC%A4%91-gpu%EB%A5%BC-%ED%9A%A8%EC%9C%A8%EC%A0%81%EC%9C%BC%EB%A1%9C-%EC%82%AC%EC%9A%A9%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95-dp%EB%B6%80%ED%84%B0-fsdp%EA%B9%8C%EC%A7%80-3057d31150b6
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler



class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 10)
        #self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

import numpy as np
dataset=torch.tensor(np.random.randn(6400,10)).float()
from tqdm import tqdm
def train(rank, world_size, batch_size, model=ToyModel(), epochs=100,dataset=None,model_dir=None):
    global_rank = rank  # multi-nodes인 경우, 수정 필요
    dist.init_process_group(
        backend="nccl",
        init_method="tcp://127.0.0.1:33445",
        rank=global_rank,
        world_size=world_size,
    )
    torch.cuda.set_device(rank)
    print(rank)
    model.to(rank)
    model = DDP(model, device_ids=[rank],output_device=rank)
    print(model)
    print(dataset.shape)
    sampler = DistributedSampler(dataset, shuffle=True, drop_last=True)
    print(sampler)
    dataloader_ = torch.utils.data.DataLoader(dataset, batch_size=batch_size, sampler=sampler)
    #dataloader_ = torch.utils.data.DataLoader(dataset, batch_size=batch_size)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
    criterion=nn.MSELoss()
    print('running')
    for epoch in tqdm(range(epochs)):
        sampler.set_epoch(epoch)
        for data in dataloader_:
            data=data.to(rank)
            #print(data.shape)
            #data = {k: data[k].to(rank) for k in data}
            optimizer.zero_grad()
            output = model(data)
            loss=criterion(output,data)
            loss.backward()
            optimizer.step()
    print('Validation')
    X=torch.tensor(dataset).float().to(rank)
    pred=model(X)
    print(rank,criterion(X,pred))
    if rank == 0:
        # 모든 작업은 같은 매개변수로부터 시작된다고 생각해야 합니다.
        # 무작위의 매개변수와 변화도는 역전파 전달로 동기화됩니다.
        # 그럼으로, 하나의 작업은 모델을 저장하기에 충분합니다.
        checkpoint_path=model_dir+'/ddp-tutorial-3-model.pt'
        import copy
        from collections import OrderedDict
        # Name change
        torch.save(model.state_dict(), checkpoint_path) # 저장
            
import torch.multiprocessing as mp
num_gpus = 2
world_size = 2
batch_size_per_gpu = 32
from tqdm import tqdm
#train(rank, world_size, batch_size, model=ToyModel(), epochs=100,dataset=None)
import time
if __name__ == "__main__":
    x=torch.tensor(np.random.randn(6400,10)).float()
    model=ToyModel()
    mse_loss=nn.MSELoss()
    before_learning_mse=mse_loss(x,model(x))
    print(before_learning_mse);time.sleep(1)
    model_dir='/data09/project/sjoh/DeepOmicsExome/model/v01/script/'
    mp.spawn(train, nprocs=num_gpus, args=(world_size, batch_size_per_gpu,model,1,dataset,model_dir),join=True)
    # 이상태에서는 mp.spawn에서 학습된 데이터를 꺼내올 수 없음.
    # 따라서, 중간과정을 저장하는 것이 필요함.
    after_learning_mse=mse_loss(x,model(x))
    # loading trained model
    state_dict=torch.load(f'{model_dir}/ddp-tutorial-3-model.pt')
    from collections import OrderedDict
    new_state_dict = OrderedDict()
    for k, v in state_dict.items():
        name = k[7:] # remove 'module.' of DataParallel/DistributedDataParallel
        new_state_dict[name] = v
    model.load_state_dict(new_state_dict)
    after_trained_model=mse_loss(x,model(x))
    print(before_learning_mse)
    print(after_learning_mse)
    print(after_trained_model)
728x90
반응형