Python의 Thread 사용에 대한 이해

게시일 : 2019년 04월 19일    
# Python # Thread # threading

Python에서 Thread에 대해 알아본다.

An Intro to Threading in Python을 기반으로 작성하였다.


Thread는 프로그램 내에서, 특히 프로세스 내에서 실행되는 흐름의 단위를 말한다. (a separate flow of execution)

파이썬에서 Multi-Threading은 GIL때문에 I/O bound Program에 사용하기 적합하다.

Python에서는 기본으로 threading이라는 라이브러리를 활용할 수 있다.

import threading

def add(res, num):
    print("start add func")
    for i in range(num):
        res +=i
    print("end add func")

# target : 실행할 함수
thread = threading.Thread( target=add, args=(0,1000) )
thread.start() # 스레드 시작
start add func
end add func

args : tuple 타입 ( parameter가 1개라면 (,) 형태를 사용해야 한다. )

Non-daemon VS Daemon Threads

Non-daemon Threads

메인 스레드의 모든 코드를 수행하더라도 모든 스레드의 작업이 끝나야 프로그램이 종료된다. (대기 시간이 생김)

import logging
import threading
import time


def thread_function(name):'Thread {name}: starting')
    time.sleep(2)'Thread {name}: finishing')"Main    : before creating thread")
x = threading.Thread(target=thread_function, args=(1,))"Main    : before running thread")
x.start()"Main    : wait for the thread to finish")"Main    : all done. Waiting non-daemon threads to finish")
INFO:root:Main    : before creating thread
INFO:root:Main    : before running thread
INFO:root:Thread 1: starting
INFO:root:Main    : wait for the thread to finish
INFO:root:Main    : all done. Waiting non-daemon threads to finish
INFO:root:Thread 1: finishing

Daemon Threads

데몬 스레드는 백그라운드에서 실행되는 스레드이다.(a process that runs in the background)

프로그램이 종료될 때 데몬 스레드도 함께 종료되는 특징을 가지고 있다.

# Thread 생성 시 daemon=True를 입력한다."Main    : before creating thread")
x = threading.Thread(target=thread_function, args=(1,), daemon=True)"Main    : before running thread")
x.start()"Main    : wait for the thread to finish")"Main    : all done. Exit the program and kill daemon threads")
INFO:root:Main    : before creating thread
INFO:root:Main    : before running thread
INFO:root:Thread 1: starting
INFO:root:Main    : wait for the thread to finish
INFO:root:Main    : all done. Exit the program and kill daemon threads
INFO:root:Thread 1: finishing

.join() : main스레드가 다른 스레드 작업이 끝나기를 기다린다.

threads = list()
for index in range(3):"Main    : create and start thread %d.", index)
    x = threading.Thread(target=thread_function, args=(index,))

for index, thread in enumerate(threads):
    thread.join() # 각각의 thread가 끝나기를 기다린다."Main    : thread %d done", index)
INFO:root:Main    : create and start thread 0.
INFO:root:Thread 0: starting
INFO:root:Main    : create and start thread 1.
INFO:root:Thread 1: starting
INFO:root:Main    : create and start thread 2.
INFO:root:Thread 2: starting
INFO:root:Thread 0: finishing
INFO:root:Main    : thread 0 done
INFO:root:Thread 1: finishing
INFO:root:Main    : thread 1 done
INFO:root:Thread 2: finishing
INFO:root:Main    : thread 2 done

참고 : 스레드가 실행되는 순서는 운영 체제에 의해 결정되기 때문에 실행순서가 코드와 같음을 보장할 수 없다.

ThreadPoolExecutor - 코드 실행 순서가 곧 Thread 실행순서

ThreadPoolExecutor은 concurrent.futures 라이브러리의 일부이다. (Python 3.2 부터 사용가능)

max_worker : thread의 개수

.map : list와 같은 형태로 args를 전달하면 thread에 하나씩 전달된다. (한 번에 실행)

.submit : thread마다 args를 각각 전달하는 코드 (반복 실행)

import concurrent.futures
import logging
import time

def thread_function(name):"Thread %s: starting", name)
    time.sleep(2)"Thread %s: finishing", name)

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: # with와 함께 사용을 권장, [0,1,2]) # 3개
    # executor.submit(thread_function, 0) \ executor.submit(thread_function, 1) \ ...
INFO:root:Thread 0: starting
INFO:root:Thread 1: starting
INFO:root:Thread 2: starting
INFO:root:Thread 0: finishing
INFO:root:Thread 1: finishing
INFO:root:Thread 2: finishing

ThreadPoolExecutor 사용 시 주의사항

parameter가 없는 function에 args를 전달하면 아무런 output없이 프로그램이 종료된다.

(excpetion이 발생하는 데 threadpoolexecutor는 이 exception을 숨기기 때문)

Race condition

두 개 이상의 thread가 공유 자원을 사용할 때 잘못된 결과가 발생할 수 있는 상황을 말한다.


초기값이 0인 DB에 thread 실행 시 하나씩 값을 올리려는 프로그램

각각의 thread는 DB의 값을 변수에 복사 후에 1을 더한 값을 다시 DB에 write한다.

class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):"Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        self.value = local_copy"Thread %s: finishing update", name)
import logging
import concurrent.futures
import time

database = FakeDatabase()"Testing update. Starting value is %d.", database.value)

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for index in range(1,4):
        executor.submit(database.update, index)"Testing update. Ending value is %d.", database.value)
INFO:root:Testing update. Starting value is 0.
INFO:root:Thread 1: starting update
INFO:root:Thread 2: starting update
INFO:root:Thread 3: starting update
INFO:root:Thread 1: finishing update
INFO:root:Thread 2: finishing update
INFO:root:Thread 3: finishing update
INFO:root:Testing update. Ending value is 1.

Ending value가 3이어야 하는데 race condition으로 인해 1이 되었다.

thread가 CPU를 사용하지 않을 때 context switching이 발생해 다른 thread가 실행된다. 그 시점은 정확히 알 수 없다.

이 특성으로 인해 Thread1이 DB에 1을 write하기 전에 Thread2가 먼저 DB의 0인 값을 가져가버리기 때문이다.

race conditon 해결방법 1 : Lock

공유 자원을 사용중인 스레드가 DB에 value를 write할 때까지 다른 스레드가 접근을 못하게 막을 수 있다.

lock은 acquire & release 함수를 통해 사용할 수 있다.

class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock() # lock 생성

    def locked_update(self, name):"Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock: # with문 활용(acquire, release 자동 적용)
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)"Thread %s: finishing update", name)
import logging
import concurrent
import time
import threading

if __name__ == "__main__":
    database = FakeDatabase()"Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.locked_update, index)"Testing update. Ending value is %d.", database.value)
02:23:44: Testing update. Starting value is 0.
02:23:44: Thread 0: starting update
02:23:44: Thread 1: starting update
02:23:44: Thread 0 about to lock
02:23:44: Thread 1 about to lock
02:23:44: Thread 0 has lock
02:23:45: Thread 0 about to release lock
02:23:45: Thread 0 after release
02:23:45: Thread 1 has lock
02:23:45: Thread 0: finishing update
02:23:45: Thread 1 about to release lock
02:23:45: Thread 1 after release
02:23:45: Thread 1: finishing update
02:23:45: Testing update. Ending value is 2.

Lock의 문제점

  1. Deadlock - Lock이 release 되지 않아 다른 스레드가 lock을 얻지 못해 무한 대기하는 경우를 주로 말한다.
  2. lock 사용이 많아질 경우 acquire & release 과정에서 overhead가 발생

참고 : RLock은 lock이 release되지 않아도 acquire을 호출할 수 있는 object이다. R : reentrant (다시 들어간다는 뜻)

race conditon 해결방법 2 : Queue를 활용한 예제 ( Queue는 thread-safe하다. )

참고 : producer-consumer-using-queue