파이썬3

파이썬 병렬화 (multiprocessing)

TTSR 2023. 12. 19. 12:38
728x90
반응형

1. 개요

파이썬은 특별하게 뭔가 설정하지 않으면 core와 thred가 몇 개이든 single thred 1개만 사용한다.

이렇게 되면 대단히 비효율적이므로 병렬화를 시켜서 하는 방법이 있다.

multiprocessing은 여러가지 병렬화 관련 작업을 도와주는데 여기서는 starmap이라는 메서드에대해 소개한다.

map과 starmap의 차이는 argument를 몇 개 받을 수 있냐의 차이로 map은 1개 밖에 안되는 반면 starmap은 2개 이상이 가능하다.

starmap과 starmap_async는 병렬처리 시 작업 결과물을 정렬할지에 관한 것으로 async 쪽이 아무래도 좀 더 빠르다.

 

2. 내용

import multiprocessing
import os
# 일부러 multiple arguments를 받도록 function을 설정했다.
def work_func(x,y):
	print('I am pid :',os.getpid())
    return x**5+y
    
# 실행
import time
def multi(num_process):
	'''
    num_process : 얼만큼 많은 process를 할당할지를 뜻함.
    '''
    start=time.time()
    pool=multiprocessing.Pool(processes=num_core) 
    inputs=[[x,y] for x in range(1000) for y in range(1000)] # input조합수 10만개
    end=time.time()
    print(end-start)
    return pool.starmap_async(work_func,inputs)
    
def single():
	start=time.time()
    pool=multiprocessing.Pool(processes=1) 
    inputs=[[x,y] for x in range(1000) for y in range(1000)] # input조합수 10만개
    end=time.time()
    print(end-start)
    return pool.starmap_async(work_func,inputs)
    
multi(num_process=10)
single()

 

주의할점은 process를 다시 합쳐야하는 과정이 있으므로 빠르지 않을 수도 있다.

starmap_async로 할 경우에는 순서와 상관없이 끝나는대로 정리하기 때문에 속도가 빨라진다.

 

4. can't pickle 어쩌고 에러 뜰 때 해결책

 multiprocessing의 단점은 함수를 부를 때 무조건 top level에서 접근 가능한 것이야 한다는 것이다.

즉 함수 안에 함수를 만드는 식이면 돌아가지 않는다. 이를 해결하기 위해서는 아래의 기능을 사용해야한다.

"""
출처 : https://gist.github.com/EdwinChan/3c13d3a746bb3ec5082f
"""

import concurrent.futures
import multiprocessing
import sys
import uuid
from itertools import combinations


def globalize(func):
    def result(*args, **kwargs):
	    return func(*args, **kwargs)
    result.__name__ = result.__qualname__ = uuid.uuid4().hex
    setattr(sys.modules[result.__module__], result.__name__, result)
    return result

@globalize
def func1(x): # 돌리고 싶은 기능 아무거나 넣는다.
	return x
    
'''
예제
'''
def func2(x):
	pool=multiprocessing.Pool(10)
    result=pool.starmap_async(func1,x).get()
    return result

 

예를들어 아래의 기능을 예로 들어보겠다.

import concurrent.futures
import multiprocessing
import sys
import uuid

def globalize(func):
    def result(*args, **kwargs):
        return func(*args, **kwargs)
    result.__name__ = result.__qualname__ = uuid.uuid4().hex
    setattr(sys.modules[result.__module__], result.__name__, result)
    return result

@globalize
def calculate_pairwise_identity(seq1, seq2):
        """
        Calculates the pairwise identity between two sequences.
        서열 1과 2는 동일한 길이라고 가정함.
        """
        matches = 0
        for i in range(len(seq1)):
            if seq1[i] == seq2[i]:
                matches += 1
        return matches / len(seq1)

pool=multiprocessing.Pool(2)
aa_block=list('abcdefghijk'.upper())
peptides=[''.join(np.random.choice(aa_block,9)) for aa in range(100)]

def calculate_identity(peptides):
    peptides=combinations(peptides,2)
    identities=pool.starmap_async(calculate_pairwise_identity,peptides)
    return identities.get()

result=calculate_identity(peptides)

# non-global
def calculate_pairwise_identity2(seq1, seq2):
        """
        Calculates the pairwise identity between two sequences.
        서열 1과 2는 동일한 길이라고 가정함.
        """
        matches = 0
        for i in range(len(seq1)):
            if seq1[i] == seq2[i]:
                matches += 1
        return matches / len(seq1)


calculate_identity(peptides)

def calculate_identity2(peptides):
    peptides=combinations(peptides,2)
    identities=pool.starmap_async(calculate_pairwise_identity2,peptides)
    return identities.get()

result2=calculate_identity2(peptides)
# error
728x90
반응형