본문 바로가기

DeepLearning/파이토치(pytorch)

[torch] cross-validation을 위한 기능 만들기 (multi-gpu)

728x90
반응형

정리가 안된 단계이지만 DistributedDataParallel 기능을 잘 활용하면 cross-validation을 수행할 수 있을 것으로 보인다.

구체적으로 각 gpu만큼 k-fold cv가 가능하다.

다만, 각 gpu에서 연산된 loss값 같은 것은 pickle로 내보내게 한 후에 하나로 합쳐야할 것으로 보인다.

 

 

테스트가 필요한 사항은 아래와 같다.

DDP를 통해 각 gpu에서 진행되던 모델이 학습되는 것이 연동된다면 정확한 k-fold는 아니게 된다.

 

import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

# 윈도우 플랫폼에서 torch.distributed 패키지는
# Gloo backend, FileStore 및 TcpStore 만을 지원합니다.
# FileStore의 경우, init_process_group 에서
# init_method 매개변수를 로컬 파일로 설정합니다.
# 다음 예시:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
#    "gloo",
#    rank=rank,
#    init_method=init_method,
#    world_size=world_size)
# TcpStore의 경우 리눅스와 동일한 방식입니다.
# https://velog.io/@hsp/Pytorch%EB%A1%9C-Data-%EB%B6%84%EC%82%B0%ED%95%99%EC%8A%B5%ED%95%98%EA%B8%B0
def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    # 작업 그룹 초기화
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()
    
    
# generate random dataset
from sklearn.datasets import make_classification
X, y =make_classification(random_state=42,n_samples=6400,n_features=10)
X.shape

class ToyModel(nn.Module):
    def __init__(self):
        super(ToyModel, self).__init__()
        self.net1 = nn.Linear(10, 10)
        self.relu = nn.ReLU()
        self.net2 = nn.Linear(10, 10)
        #self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        return self.net2(self.relu(self.net1(x)))

if False: # test
    model=ToyModel()
    model(torch.tensor(X).float())
    del model

from tqdm import tqdm
def demo_basic(rank, world_size,x):
    print(f"Running basic DDP example on rank {rank}.")
    setup(rank, world_size)
    # 모델을 생성하고 순위 아이디가 있는 GPU로 전달
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
    n=0
    for td in x:
        td=torch.tensor(td).float().to(rank)
        pred=ddp_model(td)
        mse_loss=loss_fn(td,pred)
        optimizer.zero_grad()
        mse_loss.backward()
        optimizer.step()
        n+=1
    print(mse_loss,n)
    cleanup()


def run_demo(demo_fn, world_size,x):
    mp.spawn(demo_fn,
             args=(world_size,x,),
             nprocs=world_size,
             join=True)

def demo_checkpoint(rank, world_size):
    print(f"Running DDP checkpoint example on rank {rank}.")
    setup(rank, world_size)
    model = ToyModel().to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
    if rank == 0:
        # 모든 작업은 같은 매개변수로부터 시작된다고 생각해야 합니다.
        # 무작위의 매개변수와 변화도는 역전파 전달로 동기화됩니다.
        # 그럼으로, 하나의 작업은 모델을 저장하기에 충분합니다.
        torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
    # 작업 0이 저장한 후 작업 1이 모델을 읽어오도록 barrier()를 사용합니다.
    dist.barrier()
    # configure map_location properly
    map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
    ddp_model.load_state_dict(
        torch.load(CHECKPOINT_PATH, map_location=map_location))
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
    optimizer.zero_grad()
    outputs = ddp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(rank)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    # 파일삭제를 보호하기 위해 아래에 dist.barrier()를 사용할 필요는 없습니다.
    # DDP의 역전파 전달 과정에 있는 AllReduce 옵스(ops)가 동기화 기능을 수행했기 때문에
    if rank == 0:
        os.remove(CHECKPOINT_PATH)
    cleanup()

class ToyMpModel(nn.Module):
    def __init__(self, dev0, dev1):
        super(ToyMpModel, self).__init__()
        self.dev0 = dev0
        self.dev1 = dev1
        self.net1 = torch.nn.Linear(10, 10).to(dev0)
        self.relu = torch.nn.ReLU()
        self.net2 = torch.nn.Linear(10, 5).to(dev1)
    def forward(self, x):
        x = x.to(self.dev0)
        x = self.relu(self.net1(x))
        x = x.to(self.dev1)
        return self.net2(x)


def demo_model_parallel(rank, world_size):
    print(f"Running DDP with model parallel example on rank {rank}.")
    setup(rank, world_size)
    # 작업을 위한 mp_model 및 장치 설정
    dev0 = (rank * 2) % world_size
    dev1 = (rank * 2 + 1) % world_size
    mp_model = ToyMpModel(dev0, dev1)
    ddp_mp_model = DDP(mp_model)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
    optimizer.zero_grad()
    # 출력값은 dev1에 저장
    outputs = ddp_mp_model(torch.randn(20, 10))
    labels = torch.randn(20, 5).to(dev1)
    loss_fn(outputs, labels).backward()
    optimizer.step()
    cleanup()

import time
if __name__ == "__main__":
    from sklearn.datasets import make_classification
    X, y =make_classification(random_state=42,n_samples=6400,n_features=10)
    tdata=torch.utils.data.DataLoader(dataset=torch.tensor(X),batch_size=64,shuffle=True,drop_last=True)
    n_gpus = torch.cuda.device_count()
    assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
    world_size = n_gpus
    s1=time.time();run_demo(demo_basic, world_size,x=tdata);e1=time.time();print(e1-s1)
    if 0:
        s2=time.time()
        run_demo(demo_checkpoint, world_size);print('2')
        e2=time.time()
        s3=time.time()
        run_demo(demo_model_parallel, world_size);print('3')
        e3=time.time()
        print(e1-s1,e2-s2,e3-s3)

 

 

정리가 안된단ㄱ

728x90
반응형