파이썬3
파이썬 병렬화 (multiprocessing)
TTSR
2023. 12. 19. 12:38
728x90
반응형
1. 개요
파이썬은 특별하게 뭔가 설정하지 않으면 core와 thred가 몇 개이든 single thred 1개만 사용한다.
이렇게 되면 대단히 비효율적이므로 병렬화를 시켜서 하는 방법이 있다.
multiprocessing은 여러가지 병렬화 관련 작업을 도와주는데 여기서는 starmap이라는 메서드에대해 소개한다.
map과 starmap의 차이는 argument를 몇 개 받을 수 있냐의 차이로 map은 1개 밖에 안되는 반면 starmap은 2개 이상이 가능하다.
starmap과 starmap_async는 병렬처리 시 작업 결과물을 정렬할지에 관한 것으로 async 쪽이 아무래도 좀 더 빠르다.
2. 내용
import multiprocessing
import os
# 일부러 multiple arguments를 받도록 function을 설정했다.
def work_func(x,y):
print('I am pid :',os.getpid())
return x**5+y
# 실행
import time
def multi(num_process):
'''
num_process : 얼만큼 많은 process를 할당할지를 뜻함.
'''
start=time.time()
pool=multiprocessing.Pool(processes=num_core)
inputs=[[x,y] for x in range(1000) for y in range(1000)] # input조합수 10만개
end=time.time()
print(end-start)
return pool.starmap_async(work_func,inputs)
def single():
start=time.time()
pool=multiprocessing.Pool(processes=1)
inputs=[[x,y] for x in range(1000) for y in range(1000)] # input조합수 10만개
end=time.time()
print(end-start)
return pool.starmap_async(work_func,inputs)
multi(num_process=10)
single()
주의할점은 process를 다시 합쳐야하는 과정이 있으므로 빠르지 않을 수도 있다.
starmap_async로 할 경우에는 순서와 상관없이 끝나는대로 정리하기 때문에 속도가 빨라진다.
4. can't pickle 어쩌고 에러 뜰 때 해결책
multiprocessing의 단점은 함수를 부를 때 무조건 top level에서 접근 가능한 것이야 한다는 것이다.
즉 함수 안에 함수를 만드는 식이면 돌아가지 않는다. 이를 해결하기 위해서는 아래의 기능을 사용해야한다.
"""
출처 : https://gist.github.com/EdwinChan/3c13d3a746bb3ec5082f
"""
import concurrent.futures
import multiprocessing
import sys
import uuid
from itertools import combinations
def globalize(func):
def result(*args, **kwargs):
return func(*args, **kwargs)
result.__name__ = result.__qualname__ = uuid.uuid4().hex
setattr(sys.modules[result.__module__], result.__name__, result)
return result
@globalize
def func1(x): # 돌리고 싶은 기능 아무거나 넣는다.
return x
'''
예제
'''
def func2(x):
pool=multiprocessing.Pool(10)
result=pool.starmap_async(func1,x).get()
return result
예를들어 아래의 기능을 예로 들어보겠다.
import concurrent.futures
import multiprocessing
import sys
import uuid
def globalize(func):
def result(*args, **kwargs):
return func(*args, **kwargs)
result.__name__ = result.__qualname__ = uuid.uuid4().hex
setattr(sys.modules[result.__module__], result.__name__, result)
return result
@globalize
def calculate_pairwise_identity(seq1, seq2):
"""
Calculates the pairwise identity between two sequences.
서열 1과 2는 동일한 길이라고 가정함.
"""
matches = 0
for i in range(len(seq1)):
if seq1[i] == seq2[i]:
matches += 1
return matches / len(seq1)
pool=multiprocessing.Pool(2)
aa_block=list('abcdefghijk'.upper())
peptides=[''.join(np.random.choice(aa_block,9)) for aa in range(100)]
def calculate_identity(peptides):
peptides=combinations(peptides,2)
identities=pool.starmap_async(calculate_pairwise_identity,peptides)
return identities.get()
result=calculate_identity(peptides)
# non-global
def calculate_pairwise_identity2(seq1, seq2):
"""
Calculates the pairwise identity between two sequences.
서열 1과 2는 동일한 길이라고 가정함.
"""
matches = 0
for i in range(len(seq1)):
if seq1[i] == seq2[i]:
matches += 1
return matches / len(seq1)
calculate_identity(peptides)
def calculate_identity2(peptides):
peptides=combinations(peptides,2)
identities=pool.starmap_async(calculate_pairwise_identity2,peptides)
return identities.get()
result2=calculate_identity2(peptides)
# error
728x90
반응형