Process Multiprocessing이란
멀티 프로세싱 모듈은 2.6버전 부터 추가 되었습니다. 이전에는 Jesse Noller와 Richard Oudkerk에서 정의 되었는데요.
멀티프로세싱 모듈을 사용하면 스레딩 모듈로 스레드를 생성 할 수 있는 것과 동일한 방법으로 생성 가능합니다.
프로세스 생성은 GIL(Global interpreter Lock)을 피하고 시스템의 여러 프로세서를 최대한 활용할 수 있는 장점이 있습니다..
최대 장점은 실행 속도의 차이입니다.
최근들어 대용량 처리가 많아지다 보니 다양한 패키지에서 멀티 프로세싱을 활용하는 사례가 엄청 많아졌습니다.
이는 CUP 사용을 극대화 하여 빠른 처리를 합니다.
너무 많이 사용하면 오버헤드 증가 및 메모리 사용량이 급증하겠죠 ^^
프로세스간 상태 공유
쓰레드할때는 공유데이터를 사용할려고 하면 아무제약 없이 사용하면 되지만
프로세스는 각각 독립적인 공간을 가지므로 공유 매모리 맴에 저장해서 사용해야 합니다.
공유 데이터를 사용하는 방법은 여러가지 있는데요 그중 Value, Array, Queue에 대해서 하겠습니다.
Value
from multiprocessing import Process, Value, Array
import time
import os
def add_100(number):
for i in range(100):
time.sleep(0.01)
number.value += 1
if __name__ == "__main__":
shared_number = Value('i', 0)
print("Number at beginning is", shared_number)
p1 = Process(target=add_100, args=(shared_number,))
p2 = Process(target=add_100, args=(shared_number,))
p1.start()
p2.start()
p1.join()
p2.join()
print("number at end is", shared_number)
멀티 프로세싱 사용법은 스레드랑 같으니 이전글을 참고 하기 바랍니다.
결과
멀티 프로세스 또한 공유 메모리를 사용할 경우 서로 선점 문제를 일으키게 됩니다.
쓰레드나 프로세스나 똑같군요 ㅠㅠ
쓰레드 해결 법인 Lock을 사용해야 겠네요!!! gogo sing
from multiprocessing import Process, Value, Array, Lock
import time
import os
def add_100(number, lock):
for i in range(100):
time.sleep(0.01)
lock.acquire() # 선점 방지 Lock
number.value += 1
lock.release() # 선점 헤지
if __name__ == "__main__":
lock = Lock()
shared_number = Value('i', 0) # i = integer(정수)
print("Number at beginning is", shared_number)
p1 = Process(target=add_100, args=(shared_number, lock))
p2 = Process(target=add_100, args=(shared_number, lock))
p1.start()
p2.start()
p1.join()
p2.join()
print("number at end is", shared_number)
하나의 영역을 같이 선점하지 못하게 Lock을 걸어 놓았습니다. (시원시원)
def add_100(number, lock):
for i in range(100):
time.sleep(0.01)
with lock: # 코드 줄이기!!
number.value += 1
Lock 부분을 줄일 수도 있습니다.!!!
Array
from multiprocessing import Process, Value, Array, Lock
import time
import os
def add_100(numbers, lock):
for i in range(100):
time.sleep(0.01)
for i in range(len(numbers)):
with lock:
numbers[i] += 1
if __name__ == "__main__":
lock = Lock()
shared_array = Array('d', [0.0, 100.0, 200.0]) # d = double (실수)
print("Number at beginning is", shared_array[:])
p1 = Process(target=add_100, args=(shared_array, lock))
p2 = Process(target=add_100, args=(shared_array, lock))
p1.start()
p2.start()
p1.join()
p2.join()
print("number at end is", shared_array[:])
프로세스가 2번 돌아 + 200식을 만들었네요.
결과 같이 이와 같이 안나오시는 분은 Lock의 여부를 파악해 주세요^^
Queue(큐~~)
from multiprocessing import Process, Value, Array, Lock, Queue
import time
import os
def sqare(numbers, queue):
for i in numbers:
queue.put(i*i)
def make_negative(numbers, queue):
for i in numbers:
queue.put(-1*i)
if __name__ == "__main__":
numbers = range(1, 6)
q = Queue()
p1 = Process(target=sqare, args=(numbers, q))
p2 = Process(target=make_negative, args=(numbers, q))
p1.start()
p2.start()
p1.join()
p2.join()
while not q.empty():
print(q.get())
큐에서는 몇가지 내부 메소드가 존재한다. 그중 가장 많이 쓰이는 몇게에 대해서 알아보자
- put() : 값을 넣는데 사용한다.
- get() : 값을 추출하는데 사용한다.
- empty() : 값이 비어있으면 True를 반환한다.
역시 마찬가지로 결과 같이 맘에 들지 않는군요 둘다 선점해서 값이 엉터리가 되었네요 ㅠㅠ
from multiprocessing import Process, Value, Array, Lock, Queue, Pool, Event
import time
import os
def sqare(numbers, queue, evt):
for i in numbers:
queue.put(i*i)
evt.set()
def make_negative(numbers, queue, evt):
evt.wait()
for i in numbers:
queue.put(-1*i)
if __name__ == "__main__":
evt = Event()
numbers = range(1, 6)
q = Queue()
p1 = Process(target=sqare, args=(numbers, q, evt))
p2 = Process(target=make_negative, args=(numbers, q, evt))
p1.start()
p2.start()
p1.join()
p2.join()
while not q.empty():
print(q.get())
그나마 보기가 조금 낳아 졌네요 위 코드는 스레드 블로깅 시간에 했던 Event를 사용했습니다.
Quque구조상 계속 put을 하므로 해당 결과 가 나오네요.. ㅎㅎ
Pool(폴)
from multiprocessing import Process, Value, Array, Lock, Queue, Pool, Event, Pool
import time
import os
def cube(number):
return number * number * number
if __name__ == "__main__":
numbers = range(10)
pool = Pool()
# map, apply, join, close
result = pool.map(cube, numbers)
# pool.apply(cube,numbers[0])
pool.close()
pool.join()
print(result)
Pool에도 몇가지 메소드를 가지고 있는데요. 살표보시죠
- map() : map은 여러분이 아시는 list를 만들기 좋은 함수죠 (map이 좋아~)
- close() : 리소스 낭비를 방지하기 위해 close호출 (없으면 애러)
- join() : 작업 완료 대기를 위해 호출
멀티 프로세싱 동작 시간 비교
저번 시간에 했던 2개의 쓰레드로 5000만 까지 돌려보는 코드를 작성 했습니다.
import threading
import time
shared_number = 0
def thread_1(number):
global shared_number
print("number = ",end=""), print(number)
for i in range(number):
shared_number += 1
print("end of thread_1",shared_number)
def thread_2(number):
global shared_number
print("number = ",end=""), print(number)
for i in range(number):
shared_number += 1
print("end of thread_2",shared_number)
if __name__ == "__main__":
threads = [ ]
start_time = time.time()
t1 = threading.Thread( target= thread_1, args=(50000000,) )
t1.start()
threads.append(t1)
t2 = threading.Thread( target= thread_2, args=(50000000,) )
t2.start()
threads.append(t2)
for t in threads:
t.join()
print("--- %s seconds ---" % (time.time() - start_time))
print("shared_number=",end=""), print(shared_number)
print("end of main")
경과 시간 8초대 !!!
멀티 프로세싱으로 구현(Queue 사용)
from multiprocessing import Process, Queue
import time
def worker(id, number, q):
increased_number = 0
for i in range(number):
increased_number += 1
q.put(increased_number)
return
if __name__ == "__main__":
start_time = time.time()
q = Queue()
p1 = Process(target=worker, args=(1, 50000000, q))
p2 = Process(target=worker, args=(2, 50000000, q))
p1.start()
p2.start()
p1.join()
p2.join()
print("--- %s seconds ---" % (time.time() - start_time))
q.put('exit')
total = 0
while True:
tmp = q.get()
if tmp == 'exit':
break
else:
total += tmp
print("total_number=", end=""), print(total)
print("end of main")
3초대 로 떨어진걸 알 수 있습니다.
2배 이상을 효율이 있군요!! (Good Good!)
응용 문제풀기
- 프로세스의 IPC 방법은 큐, 공유메모리, 파이프 등이 있습니다. 위의 예제는 각각의 프로세스에서 공통의 큐를 사용해서 증가시킨 값을 넣어서 메인 쓰레드에서 다시 꺼내와서 합해서 총 증가시킨 값을 구하는 예제 였습니다.
공유 메모리 방식으로 두개의 프로세스로 50000000씩 증가시키고 공유 메모리를 사용해서 세마포어로 동기화 시켜 최종 값으로 1억을 만드는 코드를 구현해 보세요.
해당 문제는 2개로 분리하시면 편리합니다.
환경설정
- shared memory 는 파이썬 3.8 이상에서 사용할 수 있으므로 파이썬 3.8 이상으로 구현할 것
- numpy 설치가 필요할 수 있음(numpy로 안해도 됩니다.)ㅋㅋㅋ
참고자료
- 멀티 프로세스 : https://docs.python.org/3/library/multiprocessing.html
- 공유 메모리 : https://docs.python.org/3/library/multiprocessing.shared_memory.html
- 세마포어 : https://docs.python.org/3/library/threading.html#semaphore-objects
위 참고 자료를 꼭 읽어 보고 오시기 바랍니다. ^^
numpy는 pands와 같이 쓰면 배열을 자기 마음대로 꾸밀수 있는 엄청난 모듈이죠(I like it)
1. 공유 메모리 방식으로 두개의 프로세스로 50000000씩 증가 코드 작성
from multiprocessing import Process, shared_memory
import numpy as np
import time
def worker(id, number, np_array, shm):
b = np.ndarray(np_array.shape, dtype=np_array.dtype, buffer=shm.buf)
for i in range(number):
b[0] += 1
if __name__ == "__main__":
np_array = np.array([0])
start_time = time.time()
shm = shared_memory.SharedMemory(create=True, size=np_array.nbytes)
b = np.ndarray(np_array.shape, dtype=np_array.dtype, buffer=shm.buf)
th1 = Process(target=worker, args=(1, 50000000, np_array, shm))
th2 = Process(target=worker, args=(2, 50000000, np_array, shm))
th1.start()
th2.start()
th1.join()
th2.join()
print("--- %s seconds ---" % (time.time() - start_time))
print(f"sheard memory : {b[0]}")
shm.close()
shm.unlink()
print("total_number=", end="")
print("end of main")
중요 포인트
1. shm = shared_memory.SharedMemory(create=True, size=np_array.nbytes)
shared_memory 메모리 영역을 만듭시다.
프로세스는 쓰레드와 달리 자원공유가 안됩니다.!! 공유를 원하는 함수 내에서 해당 공간을 똑 같이 찾아야 합니다.
또 다른 방법으로 shm변수의 name값을 주면 됩니다.
앗!! 메모리 선점 때문에 1억이 안나오는 군요 ㅠㅠ
2. 1번 만든걸 활용해서 세마포어로 동기화 시켜 최종 값으로 1억을 만들기
from multiprocessing import Process, shared_memory, Semaphore
import numpy as np
import time
def worker(id, number, np_array, shm, sam):
b = np.ndarray(np_array.shape, dtype=np_array.dtype, buffer=shm.buf)
sam.acquire()
for i in range(number):
b[0] += 1
sam.release()
if __name__ == "__main__":
sam = Semaphore(1)
np_array = np.array([0])
start_time = time.time()
shm = shared_memory.SharedMemory(create=True, size=np_array.nbytes)
b = np.ndarray(np_array.shape, dtype=np_array.dtype, buffer=shm.buf)
th1 = Process(target=worker, args=(1, 50000000, np_array, shm, sam))
th2 = Process(target=worker, args=(2, 50000000, np_array, shm, sam))
th1.start()
th2.start()
th1.join()
th2.join()
# print(b[0], 123)
# existing_shm.close()
print("--- %s seconds ---" % (time.time() - start_time))
print(f"sheard memory : {b[0]}")
shm.close()
shm.unlink()
print("total_number=", end="")
print("end of main")
세마포어를 이용하여 동기화를 시켰습니다.
중요 포인트
쓰레드는 함수 위에 아무때나 세마포어를 선언해도 메모리 공유가 되기 때문에 별문제없이 실행되지만
2개의 프로세스의 쓰는 지금 같은 세마포어 Key를 쓰기위해 Main에서 만든 세파포어를 공유해야
다른 프로세스간에 공유가 됩니다.
해당 이해는 쓰레드와 프로세스의 차이점을 명확이 이해되시면 해결 됩니다.
'Python' 카테고리의 다른 글
[Python] asyncio (0) | 2020.07.02 |
---|---|
[Python] coroutine (코루틴) (1) | 2020.06.29 |
[Python] Thread (0) | 2020.06.29 |
[Python] Lambda 실습 (0) | 2020.06.28 |
[Python] import types (0) | 2020.06.28 |
댓글