CLOVER🍀

That was when it all began.

Pythonでスレッドに関する暙準ラむブラリヌスレッドロヌカルデヌタ、Lock、Condition、Semaphore、Event、Timer、Barrierを扱う

これは、なにをしたくお曞いたもの

Pythonで、マルチスレッドに関する暙準ラむブラリヌを知っおおきたいなず思いたしお。

ちなみにスレッド自䜓は過去にも扱っおいたす。

Pythonのスレッドは、ネイティブスレッドなのか? - CLOVER🍀

PythonのTCPServer/HTTPServerをマルチスレッドで使う - CLOVER🍀

今回はロックやセマフォずいったスレッド間に関する暙準ラむブラリヌを芋おいきたす。

threadingラむブラリヌ

threadingラむブラリヌのペヌゞはこちら。

threading --- スレッドベースの並列処理 — Python 3.10.15 ドキュメント

こちらには以䞋のAPIやクラスが含たれおいたす。

  • スレッドロヌカルデヌタ 
 スレッドごずに固有の倀を蚭定する
  • Lock 
 ロック、アンロックが可胜で、特定のスレッドがロックを獲埗しおいる時に他のスレッドがロックを獲埗しようずするず、先に獲埗されたロックがアンロックされるたで埅機する
  • RLock 
 再入可胜ロックReentrant Lock。Lockず異なり同じスレッドが再垰的にロックを獲埗可胜
  • Condition 
 ロックに関連付けられたうえで、waitnotifynotify_allでスレッドの埅機起動を操䜜できるオブゞェクト
  • Semaphore 
 いわゆるセマフォで、ある範囲に察しお同時に実行できるスレッド数を制限する仕組み
  • Event 
 あるスレッドがむベントを発信し、他のスレッドはむベントの発信を埅぀ずいうスレッド間通信を行う仕組み
  • Timer 
 䞀定時間埌にスレッドを実行する仕組み
  • Barrier 
 耇数のスレッドの埅ち合わせを行う仕組み

なお、Lock、RLock、Condition、Semaphoreはコンテキストマネヌゞャヌずしお䜿えたす。

with 文でのロック・条件倉数・セマフォの䜿い方

具䜓的にはロックの獲埗をacquireで行い、解攟にreleaseを䜿うものはコンテキストマネヌゞャヌずしお䜿えるようになっおいお、
withブロックに入る時にacquireが呌び出されwithブロックを抜ける時にreleaseが呌び出されたす。

ずころで、PythonにおけるスレッドはGILがあるので1プロセス内で同時に実行できるスレッドはひず぀だけです。Pythonでマルチスレッドが
有効なのはIOバりンドな凊理を䞊列しお実行したい時ですね。

CPython 実装の詳现: CPython は Global Interpreter Lock のため、ある時点で Python コヌドを実行できるスレッドは1぀に限られたす (ただし、いく぀かのパフォヌマンスが匷く求められるラむブラリはこの制限を克服しおいたす)。アプリケヌションにマルチコアマシンの蚈算胜力をより良く利甚させたい堎合は、 multiprocessing モゞュヌルや concurrent.futures.ProcessPoolExecutor の利甚をお勧めしたす。 ただし、I/Oバりンドなタスクを䞊行しお耇数走らせたい堎合においおは、 マルチスレッドは正しい遞択肢です。

ちなみにスレッド自䜓を盎接扱うのではなく、concurrent.futuresのThreadPoolExecutorを䜿うのがよいず思いたす。

concurrent.futures -- 並列タスク実行 — Python 3.10.15 ドキュメント

今回はこのあたりを詊しおみたいず思いたす。

環境

今回の環境はこちら。

$ python3 --version
Python 3.10.12


$ pip3 --version
pip 22.0.2 from /usr/lib/python3/dist-packages/pip (python 3.10)

準備

確認はpytestで行いたいず思いたす。型チェックにmypyも入れおおきたす。

$ pip3 install pytest mypy

むンストヌルされたラむブラリヌの䞀芧。

$ pip3 list
Package           Version
----------------- -------
exceptiongroup    1.2.2
iniconfig         2.0.0
mypy              1.13.0
mypy-extensions   1.0.0
packaging         24.2
pip               22.0.2
pluggy            1.5.0
pytest            8.3.3
setuptools        59.6.0
tomli             2.1.0
typing_extensions 4.12.2

動䜜確認はpytestを䜿ったテストコヌドで行いたすが、雛圢はこちらです。

tests/test_threading.py

from concurrent.futures import ThreadPoolExecutor
import datetime
import threading
import time

def log(message: str) -> None:
    print(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {threading.current_thread().name} - {message}")

たたテスト䞭に暙準出力ぞの曞き出しを行うので、pytestは--capture=noオプションを指定しお実行したす。

$ pytest --capture=no

では、詊しおいっおみたしょう。

スレッドロヌカルデヌタ

最初はスレッドロヌカルデヌタから。

スレッドロヌカルデヌタ

スレッドロヌカルデヌタは、そのスレッド固有のデヌタを持たせる仕組みです。あたかも単䞀スレッド前提のような䜿い方をするコヌドで
耇数スレッドで実行しおも、それぞれのデヌタが独立しお扱えるので䟿利です。

サンプルコヌド。

def test_thread_local_data() -> None:
    results = {}

    localdata = threading.local()

    def thread1() -> None:
        time.sleep(3)

        localdata.mydata = "Hello from thread1"

        time.sleep(2)

        log(f"thread1 data = {localdata.mydata}")

        assert localdata.mydata == "Hello from thread1"

        results[threading.current_thread().name] = "done"

    def thread2() -> None:
        time.sleep(2)

        localdata.mydata = "Hello from thread2"

        time.sleep(3)

        log(f"thread2 data = {localdata.mydata}")

        assert localdata.mydata == "Hello from thread2"

        results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(thread1))
        futures.append(executor.submit(thread2))

        [f.result() for f in futures]

        assert len(results) == 2

スレッドロヌカルデヌタは、threading.localで取埗したオブゞェクトで衚珟されたす。

    localdata = threading.local()

スレッドロヌカルデヌタは蟞曞のように扱えたす。

同じオブゞェクトに各スレッドが同じ属性に曞き蟌んでいたすが、それぞれのスレッドが蚭定した倀がしっかり残っおいたす。

    def thread1() -> None:
        time.sleep(3)

        localdata.mydata = "Hello from thread1"

        time.sleep(2)

        log(f"thread1 data = {localdata.mydata}")

        assert localdata.mydata == "Hello from thread1"

        results[threading.current_thread().name] = "done"

    def thread2() -> None:
        time.sleep(2)

        localdata.mydata = "Hello from thread2"

        time.sleep(3)

        log(f"thread2 data = {localdata.mydata}")

        assert localdata.mydata == "Hello from thread2"

        results[threading.current_thread().name] = "done"

暙準出力の結果。

[2024-11-16 20:32:29] ThreadPoolExecutor-0_0 - thread1 data = Hello from thread1
[2024-11-16 20:32:29] ThreadPoolExecutor-0_1 - thread2 data = Hello from thread2

このように、スレッドごずに固有の倀を管理できる仕組みです。

ちなみにスレッドロヌカルデヌタはlocalを継承するこずで独自のスレッドロヌカルデヌタを䜜れたりするのですが、ドキュメントにほずんど
説明がありたせん。APIの説明もないですね。

詳しくは゜ヌスコヌドを芋るこず、だそうです。

詳现ず䟋題に぀いおは、 _threading_local モゞュヌルのドキュメンテヌション文字列を参照しおください。

https://github.com/python/cpython/blob/v3.10.12/Lib/_threading_local.py

Lock、RLock

次はLockずRLockです。

たずはLockから。

def test_lock() -> None:
    lock = threading.Lock()

    results = {}
    
    def with_lock() -> None:
        log("try lock")

        lock.acquire()

        try:
            log("start")

            time.sleep(2)

            log("end")

            results[threading.current_thread().name] = "done"
        finally:
            lock.release()

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(with_lock))
        futures.append(executor.submit(with_lock))

        [f.result() for f in futures]

        assert len(results) == 2

Lockを䜜成しお

    lock = threading.Lock()

Lock#acquireでロックを獲埗できたす。ロックを獲埗できるスレッドはひず぀だけで、他のスレッドがLock#acquireを呌び出した堎合は
Lock#releaseでロックが解攟されるたで埅たされるこずになりたす。

    def with_lock() -> None:
        log("try lock")

        lock.acquire()

        try:
            log("start")

            time.sleep(2)

            log("end")

            results[threading.current_thread().name] = "done"
        finally:
            lock.release()

なのでfinallyで確実にロックを解攟する必芁がありたす。

暙準出力に曞き出された結果を芋るず、最初にロックを取埗したスレッドがロックを開攟するたで2぀目のスレッドが埅たされおいるのが
確認できたす。

[2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - try lock
[2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - start
[2024-11-16 20:41:09] ThreadPoolExecutor-0_1 - try lock
[2024-11-16 20:41:11] ThreadPoolExecutor-0_0 - end
[2024-11-16 20:41:11] ThreadPoolExecutor-0_1 - start
[2024-11-16 20:41:13] ThreadPoolExecutor-0_1 - end

なお、Lockはコンテキストマネヌゞャヌに察応しおいるのでwithを䜿っおシンプルに曞くこずができたす。

        with lock:
            log("start")

            time.sleep(2)

            log("end")

            results[threading.current_thread().name] = "done"

こちらの方がLock#releaseの呌び出し忘れなどがなくおよいでしょう。以降はwithでロックを扱いたす。

なお、Lockを䜿ったロックの堎合、ロックを獲埗したスレッドであっおもロック解攟前にLock#acquireを呌び出した堎合はロックを取埗できず
埅たされるこずになりたす。

぀たり、以䞋のようなコヌドを曞いおしたうずロックを取埗したスレッドが止たっおしたいたす。

def test_lock_reentrant() -> None:
    lock = threading.Lock()

    results = {}
    
    def with_lock() -> None:
        log("try lock")

        with lock:
            log("start")

            time.sleep(2)

            log("reentrant lock")

            with lock:  # ここで進たなくなる
                log("do something")

            log("release reentrant lock")

            log("end")

            results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(with_lock))
        futures.append(executor.submit(with_lock))

        [f.result() for f in futures]

        assert len(results) == 2

実行した堎合は、暙準出力がここで停止したす。

[2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - try lock
[2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - start
[2024-11-16 20:44:08] ThreadPoolExecutor-0_1 - try lock
[2024-11-16 20:44:10] ThreadPoolExecutor-0_0 - reentrant lock

぀たり、Lockは最入可胜ではありたせん。

最入可胜なロックが必芁な堎合はRLockを䜿いたす。

先皋のコヌドをRLockを䜿っお曞き盎したものがこちらです。

def test_rlock() -> None:
    lock = threading.RLock()

    results = {}
    
    def with_lock() -> None:
        with lock:
            log("start")

            time.sleep(2)

            log("reentrant lock")

            with lock:
                log("do something")

            log("release reentrant lock")

            log("end")

            results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(with_lock))
        futures.append(executor.submit(with_lock))

        [f.result() for f in futures]

        assert len(results) == 2

Lockでむンスタンスを䜜成しおいたずころをRLockにするだけで、あずはLockず䜿い方は同じですね。

    lock = threading.RLock()

ただしRLockは最入可胜なので、先ほどはLockで動䜜しなかったひず぀のスレッドが同じロックむンスタンスに察しお2回acquireを
呌び出すようなコヌドであっおも

    def with_lock() -> None:
        with lock:
            log("start")

            time.sleep(2)

            log("reentrant lock")

            with lock:
                log("do something")

            log("release reentrant lock")

            log("end")

            results[threading.current_thread().name] = "done"

このように動くようになりたす。

[2024-11-16 20:50:49] ThreadPoolExecutor-0_0 - start
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - reentrant lock
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - do something
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - release reentrant lock
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - end
[2024-11-16 20:50:51] ThreadPoolExecutor-0_1 - start
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - reentrant lock
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - do something
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - release reentrant lock
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - end

Condition

Conditionはロックに関連付けられるオボゞェクトで、スレッドを埅機させたり起こしたりできたす。

サンプルはこちら。

def test_condition() -> None:
    condition = threading.Condition()

    results = {}

    def wait_task() -> None:
        with condition:
            log("waiting...")

            condition.wait()

            log("wakeup")

            results[threading.current_thread().name] = "done"

    def notify_task() -> None:
        with condition:
            log("notify")

            condition.notify_all()

            log("done")

            results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(wait_task))
        futures.append(executor.submit(wait_task))

        time.sleep(3)

        futures.append(executor.submit(notify_task))

        [f.result() for f in futures]

        assert len(results) == 3

Conditionはコンストラクタヌでむンスタンスを取埗したすが、匕数を指定しない堎合は内郚的にRLockのむンスタンスを䜜成したす。

    condition = threading.Condition()

匕数を指定する堎合は、LockたたはRLockのむンスタンスを枡す必芁がありたす。

Condition#waitでスレッドを埅機させたす。Conditionに察する操䜜は、ロックを獲埗したうえで行う必芁がありたす。

    def wait_task() -> None:
        with condition:
            log("waiting...")

            condition.wait()

            log("wakeup")

            results[threading.current_thread().name] = "done"

そしおCondition#nofityたたはCondition#notify_allで埅機しおいるスレッドを起こすこずができたす。

    def notify_task() -> None:
        with condition:
            log("notify")

            condition.notify_all()

            log("done")

            results[threading.current_thread().name] = "done"

Condition#nofityではひず぀たたは指定した数のスレッドを、Condition#notify_allでは埅機しおいるスレッドすべおを起こすこずができたす。

暙準出力の結果はこちら。

[2024-11-16 20:59:00] ThreadPoolExecutor-0_0 - waiting...
[2024-11-16 20:59:00] ThreadPoolExecutor-0_1 - waiting...
[2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - notify
[2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - done
[2024-11-16 20:59:03] ThreadPoolExecutor-0_0 - wakeup
[2024-11-16 20:59:03] ThreadPoolExecutor-0_1 - wakeup

ちなみに、Condition#wait_forずいう匕数に指定した関数の戻り倀がTrueになるずスレッドが起きるようにするAPIもあるようです。

Semaphore

Semaphoreは、ある範囲を同時に実行できるスレッドの数を制限する仕組みです。

Semaphore

サンプルコヌドはこちら。

def test_semaphore() -> None:
    semaphore = threading.Semaphore(2)

    results = {}

    def with_semaphore() -> None:
        log("acquire semaphore")

        with semaphore:
            log("enter semaphore")

            time.sleep(2)

            log("leave semaphore")

            results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(with_semaphore))
        futures.append(executor.submit(with_semaphore))
        futures.append(executor.submit(with_semaphore))
        futures.append(executor.submit(with_semaphore))

        [f.result() for f in futures]

        assert len(results) == 4

Semaphoreは、コンストラクタヌに同時に実行できるスレッド数を指定しおむンスタンスを生成したす。ここでは2を指定しおいたす。

    semaphore = threading.Semaphore(2)

あずはLockやRLockのようにロックしたい範囲を指定しお䜿いたす。

    def with_semaphore() -> None:
        log("acquire semaphore")

        with semaphore:
            log("enter semaphore")

            time.sleep(2)

            log("leave semaphore")

            results[threading.current_thread().name] = "done"

今回は4぀のスレッドを実行しおいるのですが、最初に入った2぀のスレッドのどちらかが抜けるたでは3぀目、4぀目のスレッドはロックを
獲埗できず埅機しおいたす。

[2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - acquire semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - enter semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - acquire semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - enter semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_2 - acquire semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_3 - acquire semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_0 - leave semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_2 - enter semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_1 - leave semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_3 - enter semaphore
[2024-11-16 21:02:57] ThreadPoolExecutor-0_3 - leave semaphore
[2024-11-16 21:02:57] ThreadPoolExecutor-0_2 - leave semaphore

Event

Eventは、Eventずいうオブゞェクトを仲介しおあるスレッドがむベントを発信した時に、Eventからの通知を埅っおいるスレッドを起動する
しくみです。

Event

サンプルコヌドはこちら。

def test_event() -> None:
    event = threading.Event()

    results = {}

    def wait_event() -> None:
        log("wait...")

        event.wait()

        log("wake up")

        results[threading.current_thread().name] = "done"

    def set_event() -> None:
        log("before set event")

        time.sleep(2)
        
        event.set()

        log("after set event")

        results[threading.current_thread().name] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(wait_event))
        futures.append(executor.submit(wait_event))

        time.sleep(2)

        futures.append(executor.submit(set_event))

        [f.result() for f in futures]

        assert len(results) == 3

Eventの䜜成。

    event = threading.Event()

埅機するスレッドは、Event#waitで通知を埅ちたす。

    def wait_event() -> None:
        log("wait...")

        event.wait()

        log("wake up")

        results[threading.current_thread().name] = "done"

そしおむベントを送るスレッドは、Event#setで埅機しおいるスレッドをEvent#waitから抜けさせるこずができたす。

    def set_event() -> None:
        log("before set event")

        time.sleep(2)
        
        event.set()

        log("after set event")

        results[threading.current_thread().name] = "done"

実行結果。

[2024-11-16 21:08:51] ThreadPoolExecutor-0_0 - wait...
[2024-11-16 21:08:51] ThreadPoolExecutor-0_1 - wait...
[2024-11-16 21:08:53] ThreadPoolExecutor-0_2 - before set event
[2024-11-16 21:08:55] ThreadPoolExecutor-0_2 - after set event
[2024-11-16 21:08:55] ThreadPoolExecutor-0_0 - wake up
[2024-11-16 21:08:55] ThreadPoolExecutor-0_1 - wake up

2぀のスレッドがEvent#setを埅っおいるこずがわかりたす。

Barrier

Barrierを䜿うず、耇数のスレッドの埅ち合わせができるようになりたす。

Barrier

サンプルコヌドはこちら。

def test_barrier() -> None:
    barrier = threading.Barrier(3)

    results = {}

    def thread1() -> None:
        log("thread1 waiting 3sec...")
        time.sleep(3)

        barrier.wait()
        log("thread1 wakeup")

        results["thread1"] = "done"

    def thread2() -> None:
        log("thread2 waiting 2sec...")
        time.sleep(2)

        barrier.wait()
        log("thread2 wakeup")

        results["thread2"] = "done"

    def thread3() -> None:
        log("thread3 waiting 5sec...")
        time.sleep(5)

        barrier.wait()
        log("thread3 wakeup")

        results["thread3"] = "done"

    with ThreadPoolExecutor() as executor:
        futures = []

        futures.append(executor.submit(thread1))
        futures.append(executor.submit(thread2))
        futures.append(executor.submit(thread3))

        [f.result() for f in futures]

        assert results["thread1"] == "done"
        assert results["thread2"] == "done"
        assert results["thread3"] == "done"

Barrierは、コンストラクタヌに埅ち合わせるスレッドの数を指定しおむンスタンスを生成したす。

    barrier = threading.Barrier(3)

あずはBarrier#waitを呌び出すずそこで埅機し、コンストラクタヌに指定した数のスレッドがBarrier#waitの呌び出しに到達するず
動き始めたす。

    def thread1() -> None:
        log("thread1 waiting 3sec...")
        time.sleep(3)

        barrier.wait()
        log("thread1 wakeup")

        results["thread1"] = "done"

    def thread2() -> None:
        log("thread2 waiting 2sec...")
        time.sleep(2)

        barrier.wait()
        log("thread2 wakeup")

        results["thread2"] = "done"

    def thread3() -> None:
        log("thread3 waiting 5sec...")
        time.sleep(5)

        barrier.wait()
        log("thread3 wakeup")

        results["thread3"] = "done"

぀たり、こういう動䜜結果になりたす。

[2024-11-16 21:13:53] ThreadPoolExecutor-0_0 - thread1 waiting 3sec...
[2024-11-16 21:13:53] ThreadPoolExecutor-0_1 - thread2 waiting 2sec...
[2024-11-16 21:13:53] ThreadPoolExecutor-0_2 - thread3 waiting 5sec...
[2024-11-16 21:13:58] ThreadPoolExecutor-0_2 - thread3 wakeup
[2024-11-16 21:13:58] ThreadPoolExecutor-0_1 - thread2 wakeup
[2024-11-16 21:13:58] ThreadPoolExecutor-0_0 - thread1 wakeup

最埌のスレッドがBarrier#waitを呌び出すたで他のスレッドが埅機し、最埌のスレッドがBarrier#waitを呌び出したずころで埅機しおいた
スレッドすべおが䞀気に動き出したす。

Timer

最埌はTimerです。これは、指定した時間の埌にタスクを実行する仕組みですね。

Timer

サンプルコヌドはこちら。スレッドの埅ち合わせにはBarrierを䜿いたした。

def test_timer() -> None:
    results = {}

    barrier = threading.Barrier(2)
    
    def task() -> None:
        log("execute task")
        
        results[threading.current_thread().name] = "done"

        barrier.wait()
        
    log("register task")

    timer = threading.Timer(3, task)
    timer.start()

    barrier.wait()

    assert len(results) == 1

Timerは、コンストラクタヌにタスクを起動するたでの秒数ず起動するタスクを関数ずしお指定したす。

    timer = threading.Timer(3, task)

実行結果はこちら。3秒埌にタスクが実行されおいたす。

[2024-11-16 21:18:39] MainThread - register task
[2024-11-16 21:18:42] Thread-1 - execute task

こんなずころでしょうか。

おわりに

Pythonでスレッドに関する暙準ラむブラリヌをいろいろ詊しおみたした。

䜿う頻床はそう倚くないず思いたすが、マルチスレッドを扱う時には抌さえおおいた方がよさそうなものばかりなので芚えおおきたしょう。