ããã¯ããªã«ãããããŠæžãããã®ïŒ
Pythonã§ããã«ãã¹ã¬ããã«é¢ããæšæºã©ã€ãã©ãªãŒãç¥ã£ãŠãããããªãšæããŸããŠã
ã¡ãªã¿ã«ã¹ã¬ããèªäœã¯éå»ã«ãæ±ã£ãŠããŸãã
Pythonのスレッドは、ネイティブスレッドなのか? - CLOVER🍀
PythonのTCPServer/HTTPServerをマルチスレッドで使う - CLOVER🍀
ä»åã¯ããã¯ãã»ããã©ãšãã£ãã¹ã¬ããéã«é¢ããæšæºã©ã€ãã©ãªãŒãèŠãŠãããŸãã
threadingã©ã€ãã©ãªãŒ
threadingã©ã€ãã©ãªãŒã®ããŒãžã¯ãã¡ãã
threading --- スレッドベースの並列処理 — Python 3.10.15 ドキュメント
ãã¡ãã«ã¯ä»¥äžã®APIãã¯ã©ã¹ãå«ãŸããŠããŸãã
- ã¹ã¬ããããŒã«ã«ããŒã¿ ⊠ã¹ã¬ããããšã«åºæã®å€ãèšå®ãã
- Lock ⊠ããã¯ãã¢ã³ããã¯ãå¯èœã§ãç¹å®ã®ã¹ã¬ãããããã¯ãç²åŸããŠããæã«ä»ã®ã¹ã¬ãããããã¯ãç²åŸããããšãããšãå ã«ç²åŸãããããã¯ãã¢ã³ããã¯ããããŸã§åŸ æ©ãã
- RLock ⊠åå ¥å¯èœããã¯ïŒReentrant LockïŒãLockãšç°ãªãåãã¹ã¬ãããååž°çã«ããã¯ãç²åŸå¯èœ
- Condition ⊠ããã¯ã«é¢é£ä»ããããããã§ã
wait
ïŒnotify
ïŒnotify_all
ïŒã§ã¹ã¬ããã®åŸ æ©ïŒèµ·åãæäœã§ãããªããžã§ã¯ã - Semaphore ⊠ããããã»ããã©ã§ãããç¯å²ã«å¯ŸããŠåæã«å®è¡ã§ããã¹ã¬ããæ°ãå¶éããä»çµã¿
- Event ⊠ããã¹ã¬ãããã€ãã³ããçºä¿¡ããä»ã®ã¹ã¬ããã¯ã€ãã³ãã®çºä¿¡ãåŸ ã€ãšããã¹ã¬ããééä¿¡ãè¡ãä»çµã¿
- Timer ⊠äžå®æéåŸã«ã¹ã¬ãããå®è¡ããä»çµã¿
- Barrier âŠ è€æ°ã®ã¹ã¬ããã®åŸ ã¡åãããè¡ãä»çµã¿
ãªããLock
ãRLock
ãCondition
ãSemaphore
ã¯ã³ã³ããã¹ããããŒãžã£ãŒãšããŠäœ¿ããŸãã
with æã§ã®ããã¯ã»æ¡ä»¶å€æ°ã»ã»ããã©ã®äœ¿ãæ¹
å
·äœçã«ã¯ããã¯ã®ç²åŸãacquire
ã§è¡ããè§£æŸã«release
ã䜿ããã®ã¯ã³ã³ããã¹ããããŒãžã£ãŒãšããŠäœ¿ããããã«ãªã£ãŠããŠã
with
ãããã¯ã«å
¥ãæã«acquire
ãåŒã³åºããwith
ãããã¯ãæããæã«release
ãåŒã³åºãããŸãã
ãšããã§ãPythonã«ãããã¹ã¬ããã¯GILãããã®ã§1ããã»ã¹å
ã§åæã«å®è¡ã§ããã¹ã¬ããã¯ã²ãšã€ã ãã§ããPythonã§ãã«ãã¹ã¬ããã
æå¹ãªã®ã¯IOããŠã³ããªåŠçã䞊åããŠå®è¡ãããæã§ããã
CPython å®è£ ã®è©³çް: CPython 㯠Global Interpreter Lock ã®ãããããæç¹ã§ Python ã³ãŒããå®è¡ã§ããã¹ã¬ããã¯1ã€ã«éãããŸã (ãã ããããã€ãã®ããã©ãŒãã³ã¹ãåŒ·ãæ±ããããã©ã€ãã©ãªã¯ãã®å¶éãå æããŠããŸã)ãã¢ããªã±ãŒã·ã§ã³ã«ãã«ãã³ã¢ãã·ã³ã®èšç®èœåãããè¯ãå©çšããããå Žåã¯ã multiprocessing ã¢ãžã¥ãŒã«ã concurrent.futures.ProcessPoolExecutor ã®å©çšããå§ãããŸãã ãã ããI/OããŠã³ããªã¿ã¹ã¯ã䞊è¡ããŠè€æ°èµ°ããããå Žåã«ãããŠã¯ã ãã«ãã¹ã¬ããã¯æ£ããéžæè¢ã§ãã
ã¡ãªã¿ã«ã¹ã¬ããèªäœãçŽæ¥æ±ãã®ã§ã¯ãªããconcurrent.futures
ã®ThreadPoolExecutor
ã䜿ãã®ããããšæããŸãã
concurrent.futures -- 並列タスク実行 — Python 3.10.15 ドキュメント
ä»åã¯ãã®ãããã詊ããŠã¿ãããšæããŸãã
ç°å¢
ä»åã®ç°å¢ã¯ãã¡ãã
$ python3 --version Python 3.10.12 $ pip3 --version pip 22.0.2 from /usr/lib/python3/dist-packages/pip (python 3.10)
æºå
確èªã¯pytestã§è¡ããããšæããŸããåãã§ãã¯ã«mypyãå ¥ããŠãããŸãã
$ pip3 install pytest mypy
ã€ã³ã¹ããŒã«ãããã©ã€ãã©ãªãŒã®äžèЧã
$ pip3 list Package Version ----------------- ------- exceptiongroup 1.2.2 iniconfig 2.0.0 mypy 1.13.0 mypy-extensions 1.0.0 packaging 24.2 pip 22.0.2 pluggy 1.5.0 pytest 8.3.3 setuptools 59.6.0 tomli 2.1.0 typing_extensions 4.12.2
åäœç¢ºèªã¯pytestã䜿ã£ããã¹ãã³ãŒãã§è¡ããŸãããé圢ã¯ãã¡ãã§ãã
tests/test_threading.py
from concurrent.futures import ThreadPoolExecutor import datetime import threading import time def log(message: str) -> None: print(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {threading.current_thread().name} - {message}")
ãŸããã¹ãäžã«æšæºåºåãžã®æžãåºããè¡ãã®ã§ãpytestã¯--capture=no
ãªãã·ã§ã³ãæå®ããŠå®è¡ããŸãã
$ pytest --capture=no
ã§ã¯ã詊ããŠãã£ãŠã¿ãŸãããã
ã¹ã¬ããããŒã«ã«ããŒã¿
æåã¯ã¹ã¬ããããŒã«ã«ããŒã¿ããã
ã¹ã¬ããããŒã«ã«ããŒã¿
ã¹ã¬ããããŒã«ã«ããŒã¿ã¯ããã®ã¹ã¬ããåºæã®ããŒã¿ãæãããä»çµã¿ã§ããããããåäžã¹ã¬ããåæã®ãããªäœ¿ãæ¹ãããã³ãŒãã§
è€æ°ã¹ã¬ããã§å®è¡ããŠããããããã®ããŒã¿ãç¬ç«ããŠæ±ããã®ã§äŸ¿å©ã§ãã
ãµã³ãã«ã³ãŒãã
def test_thread_local_data() -> None: results = {} localdata = threading.local() def thread1() -> None: time.sleep(3) localdata.mydata = "Hello from thread1" time.sleep(2) log(f"thread1 data = {localdata.mydata}") assert localdata.mydata == "Hello from thread1" results[threading.current_thread().name] = "done" def thread2() -> None: time.sleep(2) localdata.mydata = "Hello from thread2" time.sleep(3) log(f"thread2 data = {localdata.mydata}") assert localdata.mydata == "Hello from thread2" results[threading.current_thread().name] = "done" with ThreadPoolExecutor() as executor: futures = [] futures.append(executor.submit(thread1)) futures.append(executor.submit(thread2)) [f.result() for f in futures] assert len(results) == 2
ã¹ã¬ããããŒã«ã«ããŒã¿ã¯ãthreading.local
ã§ååŸãããªããžã§ã¯ãã§è¡šçŸãããŸãã
localdata = threading.local()
ã¹ã¬ããããŒã«ã«ããŒã¿ã¯èŸæžã®ããã«æ±ããŸãã
åããªããžã§ã¯ãã«åã¹ã¬ãããåã屿§ã«æžã蟌ãã§ããŸãããããããã®ã¹ã¬ãããèšå®ããå€ããã£ããæ®ã£ãŠããŸãã
def thread1() -> None: time.sleep(3) localdata.mydata = "Hello from thread1" time.sleep(2) log(f"thread1 data = {localdata.mydata}") assert localdata.mydata == "Hello from thread1" results[threading.current_thread().name] = "done" def thread2() -> None: time.sleep(2) localdata.mydata = "Hello from thread2" time.sleep(3) log(f"thread2 data = {localdata.mydata}") assert localdata.mydata == "Hello from thread2" results[threading.current_thread().name] = "done"
æšæºåºåã®çµæã
[2024-11-16 20:32:29] ThreadPoolExecutor-0_0 - thread1 data = Hello from thread1 [2024-11-16 20:32:29] ThreadPoolExecutor-0_1 - thread2 data = Hello from thread2
ãã®ããã«ãã¹ã¬ããããšã«åºæã®å€ã管çã§ããä»çµã¿ã§ãã
ã¡ãªã¿ã«ã¹ã¬ããããŒã«ã«ããŒã¿ã¯local
ãç¶æ¿ããããšã§ç¬èªã®ã¹ã¬ããããŒã«ã«ããŒã¿ãäœãããããã®ã§ãããããã¥ã¡ã³ãã«ã»ãšãã©
説æããããŸãããAPIã®èª¬æããªãã§ããã
詳ããã¯ãœãŒã¹ã³ãŒããèŠãããšãã ããã§ãã
詳现ãšäŸé¡ã«ã€ããŠã¯ã _threading_local ã¢ãžã¥ãŒã«ã®ããã¥ã¡ã³ããŒã·ã§ã³æååãåç §ããŠãã ããã
https://github.com/python/cpython/blob/v3.10.12/Lib/_threading_local.py
LockãRLock
次ã¯Lock
ãšRLock
ã§ãã
ãŸãã¯Lock
ããã
def test_lock() -> None: lock = threading.Lock() results = {} def with_lock() -> None: log("try lock") lock.acquire() try: log("start") time.sleep(2) log("end") results[threading.current_thread().name] = "done" finally: lock.release() with ThreadPoolExecutor() as executor: futures = [] futures.append(executor.submit(with_lock)) futures.append(executor.submit(with_lock)) [f.result() for f in futures] assert len(results) == 2
Lock
ãäœæããŠ
lock = threading.Lock()
Lock#acquire
ã§ããã¯ãç²åŸã§ããŸããããã¯ãç²åŸã§ããã¹ã¬ããã¯ã²ãšã€ã ãã§ãä»ã®ã¹ã¬ãããLock#acquire
ãåŒã³åºããå Žåã¯
Lock#release
ã§ããã¯ãè§£æŸããããŸã§åŸ
ããããããšã«ãªããŸãã
def with_lock() -> None: log("try lock") lock.acquire() try: log("start") time.sleep(2) log("end") results[threading.current_thread().name] = "done" finally: lock.release()
ãªã®ã§finally
ã§ç¢ºå®ã«ããã¯ãè§£æŸããå¿
èŠããããŸãã
æšæºåºåã«æžãåºãããçµæãèŠããšãæåã«ããã¯ãååŸããã¹ã¬ãããããã¯ãéæŸãããŸã§2ã€ç®ã®ã¹ã¬ãããåŸ
ããããŠããã®ã
確èªã§ããŸãã
[2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - try lock [2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - start [2024-11-16 20:41:09] ThreadPoolExecutor-0_1 - try lock [2024-11-16 20:41:11] ThreadPoolExecutor-0_0 - end [2024-11-16 20:41:11] ThreadPoolExecutor-0_1 - start [2024-11-16 20:41:13] ThreadPoolExecutor-0_1 - end
ãªããLock
ã¯ã³ã³ããã¹ããããŒãžã£ãŒã«å¯Ÿå¿ããŠããã®ã§with
ã䜿ã£ãŠã·ã³ãã«ã«æžãããšãã§ããŸãã
with lock: log("start") time.sleep(2) log("end") results[threading.current_thread().name] = "done"
ãã¡ãã®æ¹ãLock#release
ã®åŒã³åºãå¿ããªã©ããªããŠããã§ãããã以éã¯with
ã§ããã¯ãæ±ããŸãã
ãªããLock
ã䜿ã£ãããã¯ã®å Žåãããã¯ãç²åŸããã¹ã¬ããã§ãã£ãŠãããã¯è§£æŸåã«Lock#acquire
ãåŒã³åºããå Žåã¯ããã¯ãååŸã§ãã
åŸ
ããããããšã«ãªããŸãã
ã€ãŸãã以äžã®ãããªã³ãŒããæžããŠããŸããšããã¯ãååŸããã¹ã¬ãããæ¢ãŸã£ãŠããŸããŸãã
def test_lock_reentrant() -> None: lock = threading.Lock() results = {} def with_lock() -> None: log("try lock") with lock: log("start") time.sleep(2) log("reentrant lock") with lock: # ããã§é²ãŸãªããªã log("do something") log("release reentrant lock") log("end") results[threading.current_thread().name] = "done" with ThreadPoolExecutor() as executor: futures = [] futures.append(executor.submit(with_lock)) futures.append(executor.submit(with_lock)) [f.result() for f in futures] assert len(results) == 2
å®è¡ããå Žåã¯ãæšæºåºåãããã§åæ¢ããŸãã
[2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - try lock [2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - start [2024-11-16 20:44:08] ThreadPoolExecutor-0_1 - try lock [2024-11-16 20:44:10] ThreadPoolExecutor-0_0 - reentrant lock
ã€ãŸããLock
ã¯æå
¥å¯èœã§ã¯ãããŸããã
æå
¥å¯èœãªããã¯ãå¿
èŠãªå Žåã¯RLock
ã䜿ããŸãã
å
çšã®ã³ãŒããRLock
ã䜿ã£ãŠæžãçŽãããã®ããã¡ãã§ãã
def test_rlock() -> None: lock = threading.RLock() results = {} def with_lock() -> None: with lock: log("start") time.sleep(2) log("reentrant lock") with lock: log("do something") log("release reentrant lock") log("end") results[threading.current_thread().name] = "done" with ThreadPoolExecutor() as executor: futures = [] futures.append(executor.submit(with_lock)) futures.append(executor.submit(with_lock)) [f.result() for f in futures] assert len(results) == 2
Lock
ã§ã€ã³ã¹ã¿ã³ã¹ãäœæããŠãããšãããRLock
ã«ããã ãã§ãããšã¯Lock
ãšäœ¿ãæ¹ã¯åãã§ããã
lock = threading.RLock()
ãã ãRLock
ã¯æå
¥å¯èœãªã®ã§ãå
ã»ã©ã¯Lock
ã§åäœããªãã£ãã²ãšã€ã®ã¹ã¬ãããåãããã¯ã€ã³ã¹ã¿ã³ã¹ã«å¯ŸããŠ2åacquire
ã
åŒã³åºããããªã³ãŒãã§ãã£ãŠã
def with_lock() -> None: with lock: log("start") time.sleep(2) log("reentrant lock") with lock: log("do something") log("release reentrant lock") log("end") results[threading.current_thread().name] = "done"
ãã®ããã«åãããã«ãªããŸãã
[2024-11-16 20:50:49] ThreadPoolExecutor-0_0 - start [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - reentrant lock [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - do something [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - release reentrant lock [2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - end [2024-11-16 20:50:51] ThreadPoolExecutor-0_1 - start [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - reentrant lock [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - do something [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - release reentrant lock [2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - end
Condition
Conditionã¯ããã¯ã«é¢é£ä»ãããããªããžã§ã¯ãã§ãã¹ã¬ãããåŸ æ©ããããèµ·ããããã§ããŸãã
ãµã³ãã«ã¯ãã¡ãã
def test_condition() -> None: condition = threading.Condition() results = {} def wait_task() -> None: with condition: log("waiting...") condition.wait() log("wakeup") results[threading.current_thread().name] = "done" def notify_task() -> None: with condition: log("notify") condition.notify_all() log("done") results[threading.current_thread().name] = "done" with ThreadPoolExecutor() as executor: futures = [] futures.append(executor.submit(wait_task)) futures.append(executor.submit(wait_task)) time.sleep(3) futures.append(executor.submit(notify_task)) [f.result() for f in futures] assert len(results) == 3
Condition
ã¯ã³ã³ã¹ãã©ã¯ã¿ãŒã§ã€ã³ã¹ã¿ã³ã¹ãååŸããŸãããåŒæ°ãæå®ããªãå Žåã¯å
éšçã«RLock
ã®ã€ã³ã¹ã¿ã³ã¹ãäœæããŸãã
condition = threading.Condition()
åŒæ°ãæå®ããå Žåã¯ãLock
ãŸãã¯RLock
ã®ã€ã³ã¹ã¿ã³ã¹ãæž¡ãå¿
èŠããããŸãã
Condition#wait
ã§ã¹ã¬ãããåŸ
æ©ãããŸããCondition
ã«å¯Ÿããæäœã¯ãããã¯ãç²åŸããããã§è¡ãå¿
èŠããããŸãã
def wait_task() -> None: with condition: log("waiting...") condition.wait() log("wakeup") results[threading.current_thread().name] = "done"
ãããŠCondition#nofity
ãŸãã¯Condition#notify_all
ã§åŸ
æ©ããŠããã¹ã¬ãããèµ·ããããšãã§ããŸãã
def notify_task() -> None: with condition: log("notify") condition.notify_all() log("done") results[threading.current_thread().name] = "done"
Condition#nofity
ã§ã¯ã²ãšã€ãŸãã¯æå®ããæ°ã®ã¹ã¬ããããCondition#notify_all
ã§ã¯åŸ
æ©ããŠããã¹ã¬ãããã¹ãŠãèµ·ããããšãã§ããŸãã
æšæºåºåã®çµæã¯ãã¡ãã
[2024-11-16 20:59:00] ThreadPoolExecutor-0_0 - waiting... [2024-11-16 20:59:00] ThreadPoolExecutor-0_1 - waiting... [2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - notify [2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - done [2024-11-16 20:59:03] ThreadPoolExecutor-0_0 - wakeup [2024-11-16 20:59:03] ThreadPoolExecutor-0_1 - wakeup
ã¡ãªã¿ã«ãCondition#wait_for
ãšããåŒæ°ã«æå®ãã颿°ã®æ»ãå€ãTrue
ã«ãªããšã¹ã¬ãããèµ·ããããã«ããAPIãããããã§ãã
Semaphore
Semaphore
ã¯ãããç¯å²ãåæã«å®è¡ã§ããã¹ã¬ããã®æ°ãå¶éããä»çµã¿ã§ãã
ãµã³ãã«ã³ãŒãã¯ãã¡ãã
def test_semaphore() -> None: semaphore = threading.Semaphore(2) results = {} def with_semaphore() -> None: log("acquire semaphore") with semaphore: log("enter semaphore") time.sleep(2) log("leave semaphore") results[threading.current_thread().name] = "done" with ThreadPoolExecutor() as executor: futures = [] futures.append(executor.submit(with_semaphore)) futures.append(executor.submit(with_semaphore)) futures.append(executor.submit(with_semaphore)) futures.append(executor.submit(with_semaphore)) [f.result() for f in futures] assert len(results) == 4
Semaphore
ã¯ãã³ã³ã¹ãã©ã¯ã¿ãŒã«åæã«å®è¡ã§ããã¹ã¬ããæ°ãæå®ããŠã€ã³ã¹ã¿ã³ã¹ãçæããŸããããã§ã¯2ãæå®ããŠããŸãã
semaphore = threading.Semaphore(2)
ããšã¯Lock
ãRLock
ã®ããã«ããã¯ãããç¯å²ãæå®ããŠäœ¿ããŸãã
def with_semaphore() -> None: log("acquire semaphore") with semaphore: log("enter semaphore") time.sleep(2) log("leave semaphore") results[threading.current_thread().name] = "done"
ä»åã¯4ã€ã®ã¹ã¬ãããå®è¡ããŠããã®ã§ãããæåã«å
¥ã£ã2ã€ã®ã¹ã¬ããã®ã©ã¡ãããæãããŸã§ã¯3ã€ç®ã4ã€ç®ã®ã¹ã¬ããã¯ããã¯ã
ç²åŸã§ããåŸ
æ©ããŠããŸãã
[2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - acquire semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - enter semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - acquire semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - enter semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_2 - acquire semaphore [2024-11-16 21:02:53] ThreadPoolExecutor-0_3 - acquire semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_0 - leave semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_2 - enter semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_1 - leave semaphore [2024-11-16 21:02:55] ThreadPoolExecutor-0_3 - enter semaphore [2024-11-16 21:02:57] ThreadPoolExecutor-0_3 - leave semaphore [2024-11-16 21:02:57] ThreadPoolExecutor-0_2 - leave semaphore
Event
Event
ã¯ãEvent
ãšãããªããžã§ã¯ãã仲ä»ããŠããã¹ã¬ãããã€ãã³ããçºä¿¡ããæã«ãEvent
ããã®éç¥ãåŸ
ã£ãŠããã¹ã¬ãããèµ·åãã
ããã¿ã§ãã
ãµã³ãã«ã³ãŒãã¯ãã¡ãã
def test_event() -> None: event = threading.Event() results = {} def wait_event() -> None: log("wait...") event.wait() log("wake up") results[threading.current_thread().name] = "done" def set_event() -> None: log("before set event") time.sleep(2) event.set() log("after set event") results[threading.current_thread().name] = "done" with ThreadPoolExecutor() as executor: futures = [] futures.append(executor.submit(wait_event)) futures.append(executor.submit(wait_event)) time.sleep(2) futures.append(executor.submit(set_event)) [f.result() for f in futures] assert len(results) == 3
Event
ã®äœæã
event = threading.Event()
åŸ
æ©ããã¹ã¬ããã¯ãEvent#wait
ã§éç¥ãåŸ
ã¡ãŸãã
def wait_event() -> None: log("wait...") event.wait() log("wake up") results[threading.current_thread().name] = "done"
ãããŠã€ãã³ããéãã¹ã¬ããã¯ãEvent#set
ã§åŸ
æ©ããŠããã¹ã¬ãããEvent#wait
ããæããããããšãã§ããŸãã
def set_event() -> None: log("before set event") time.sleep(2) event.set() log("after set event") results[threading.current_thread().name] = "done"
å®è¡çµæã
[2024-11-16 21:08:51] ThreadPoolExecutor-0_0 - wait... [2024-11-16 21:08:51] ThreadPoolExecutor-0_1 - wait... [2024-11-16 21:08:53] ThreadPoolExecutor-0_2 - before set event [2024-11-16 21:08:55] ThreadPoolExecutor-0_2 - after set event [2024-11-16 21:08:55] ThreadPoolExecutor-0_0 - wake up [2024-11-16 21:08:55] ThreadPoolExecutor-0_1 - wake up
2ã€ã®ã¹ã¬ãããEvent#set
ãåŸ
ã£ãŠããããšãããããŸãã
Barrier
Barrier
ã䜿ããšãè€æ°ã®ã¹ã¬ããã®åŸ
ã¡åãããã§ããããã«ãªããŸãã
ãµã³ãã«ã³ãŒãã¯ãã¡ãã
def test_barrier() -> None: barrier = threading.Barrier(3) results = {} def thread1() -> None: log("thread1 waiting 3sec...") time.sleep(3) barrier.wait() log("thread1 wakeup") results["thread1"] = "done" def thread2() -> None: log("thread2 waiting 2sec...") time.sleep(2) barrier.wait() log("thread2 wakeup") results["thread2"] = "done" def thread3() -> None: log("thread3 waiting 5sec...") time.sleep(5) barrier.wait() log("thread3 wakeup") results["thread3"] = "done" with ThreadPoolExecutor() as executor: futures = [] futures.append(executor.submit(thread1)) futures.append(executor.submit(thread2)) futures.append(executor.submit(thread3)) [f.result() for f in futures] assert results["thread1"] == "done" assert results["thread2"] == "done" assert results["thread3"] == "done"
Barrier
ã¯ãã³ã³ã¹ãã©ã¯ã¿ãŒã«åŸ
ã¡åãããã¹ã¬ããã®æ°ãæå®ããŠã€ã³ã¹ã¿ã³ã¹ãçæããŸãã
barrier = threading.Barrier(3)
ããšã¯Barrier#wait
ãåŒã³åºããšããã§åŸ
æ©ããã³ã³ã¹ãã©ã¯ã¿ãŒã«æå®ããæ°ã®ã¹ã¬ãããBarrier#wait
ã®åŒã³åºãã«å°éãããš
åãå§ããŸãã
def thread1() -> None: log("thread1 waiting 3sec...") time.sleep(3) barrier.wait() log("thread1 wakeup") results["thread1"] = "done" def thread2() -> None: log("thread2 waiting 2sec...") time.sleep(2) barrier.wait() log("thread2 wakeup") results["thread2"] = "done" def thread3() -> None: log("thread3 waiting 5sec...") time.sleep(5) barrier.wait() log("thread3 wakeup") results["thread3"] = "done"
ã€ãŸããããããåäœçµæã«ãªããŸãã
[2024-11-16 21:13:53] ThreadPoolExecutor-0_0 - thread1 waiting 3sec... [2024-11-16 21:13:53] ThreadPoolExecutor-0_1 - thread2 waiting 2sec... [2024-11-16 21:13:53] ThreadPoolExecutor-0_2 - thread3 waiting 5sec... [2024-11-16 21:13:58] ThreadPoolExecutor-0_2 - thread3 wakeup [2024-11-16 21:13:58] ThreadPoolExecutor-0_1 - thread2 wakeup [2024-11-16 21:13:58] ThreadPoolExecutor-0_0 - thread1 wakeup
æåŸã®ã¹ã¬ãããBarrier#wait
ãåŒã³åºããŸã§ä»ã®ã¹ã¬ãããåŸ
æ©ããæåŸã®ã¹ã¬ãããBarrier#wait
ãåŒã³åºãããšããã§åŸ
æ©ããŠãã
ã¹ã¬ãããã¹ãŠãäžæ°ã«åãåºããŸãã
Timer
æåŸã¯Timer
ã§ããããã¯ãæå®ããæéã®åŸã«ã¿ã¹ã¯ãå®è¡ããä»çµã¿ã§ããã
ãµã³ãã«ã³ãŒãã¯ãã¡ããã¹ã¬ããã®åŸ
ã¡åããã«ã¯Barrier
ã䜿ããŸããã
def test_timer() -> None: results = {} barrier = threading.Barrier(2) def task() -> None: log("execute task") results[threading.current_thread().name] = "done" barrier.wait() log("register task") timer = threading.Timer(3, task) timer.start() barrier.wait() assert len(results) == 1
Timer
ã¯ãã³ã³ã¹ãã©ã¯ã¿ãŒã«ã¿ã¹ã¯ãèµ·åãããŸã§ã®ç§æ°ãšèµ·åããã¿ã¹ã¯ã颿°ãšããŠæå®ããŸãã
timer = threading.Timer(3, task)
å®è¡çµæã¯ãã¡ãã3ç§åŸã«ã¿ã¹ã¯ãå®è¡ãããŠããŸãã
[2024-11-16 21:18:39] MainThread - register task [2024-11-16 21:18:42] Thread-1 - execute task
ãããªãšããã§ããããã
ãããã«
Pythonã§ã¹ã¬ããã«é¢ããæšæºã©ã€ãã©ãªãŒããããã詊ããŠã¿ãŸããã
䜿ãé »åºŠã¯ããå€ããªããšæããŸããããã«ãã¹ã¬ãããæ±ãæã«ã¯æŒãããŠãããæ¹ããããããªãã®ã°ãããªã®ã§èŠããŠãããŸãããã