【Python】threadingモジュールの基本|実例付きで解説

※本サイトにはプロモーション・広告が含まれています。

(最終更新日:2023年7月)

✔このような方へ向けて書かれた記事となります

「Pythonでのマルチスレッド処理ってどのように実装するの?」
「threadingモジュールの使い方を詳しく知りたい」
「Pythonによるマルチスレッドプログラミングの具体例を見たい」

✔当記事を通じてお伝えすること

  • Pythonでのマルチスレッド処理の基本
  • threadingモジュールの使い方とその応用
  • Pythonマルチスレッドプログラミングの実例

当記事では、Pythonでのマルチスレッド処理の基本から、threadingモジュールの正しい使い方や応用方法、さらには具体的な実例を詳しく解説しています。

ぜひ最後までご覧ください。

筆者プロフィール

筆者プロフィールアイコン

【現職】プロダクトマネージャー

【副業】ブログ(月間20万PV)/YouTube/Web・アプリ制作

「プログラミング × ライティング × 営業」の経験を活かし、30後半からのIT系職へシフト。現在はプロダクトマネージャーとして、さまざまな関係者の間に入り奮闘してます。当サイトでは、実際に手を動かせるWebアプリの開発を通じて、プログラミングはもちろん、IT職に必要な情報を提供していきます。

【当ブログで紹介しているサイト】

当サイトチュートリアルで作成したデモ版日報アプリ

Django × Reactで開発したツール系Webアプリ

✔人に見せても恥ずかしくないコードを書こう

「リーダブルコード」は、わかりやすく良いコードの定義を教えてくれる本です。

  • 見るからにきれいなコードの書き方
  • コードの分割方法
  • 変数や関数の命名規則

エンジニアのスタンダートとすべき基準を一から解説しています。

何回も読むのに値する本なので、ぜひ手にとって読んでみてください。

スレッドの基本と使用方法

ここでは、Pythonのスレッドについて説明し、その基本的な使用方法について学んでいきます。

基本を理解したうえで、先の応用した使い方へ進んでください。

  • Pythonのスレッドとは
  • threadingモジュールの概要
  • Threadオブジェクトの作成とカスタマイズ
  • スレッドローカルデータの使用

Pythonのスレッドとは

スレッドとは、プログラム内で並行にタスクを実行するための手段のこと。

一部のタスクが待機中でも、ほかのタスクが進行し、プログラム全体の効率が向上します。

Pythonでは、threadingモジュールを用いて簡単にスレッドを扱えます。

例えば以下のようなコードで、2つのスレッドを同時に動作し、結果をコンソールに出力可能です。

import threading
import time

def print_numbers():
    for i in range(10):
        time.sleep(1)
        print(i)

def print_letters():
    for letter in 'abcdefghij':
        time.sleep(1.5)
        print(letter)

thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)

thread1.start()
thread2.start()

print_numbers関数とprint_letters関数が別々のスレッドで並行に実行され、数字と文字が交互に出力されます。

threadingモジュールの概要

Pythonのthreadingモジュールは、高レベルのスレッドインターフェースを提供します。

このモジュールを使用すると、以下のような操作が可能です。

  • スレッドの生成
  • スレッドの起動
  • スレッドの結合
  • スレッドの同期化

具体的には、Threadクラスを用いて新たなスレッドを生成し、そのstartメソッドを呼び出すことでスレッドを開始できます。

新たなスレッドの作成は、Threadクラスをインスタンス化し、そのtarget引数にスレッドで実行する関数を指定しましょう。

import threading

# スレッドで実行する関数
def print_numbers():
    for i in range(1, 6):
        print(i)

# 新しいスレッドを生成して実行
thread = threading.Thread(target=print_numbers)
thread.start()

# メインスレッドの処理
for i in range(6, 11):
    print(i)

Threadオブジェクトの作成とカスタマイズ

新たなスレッドは、threading.Threadクラスをインスタンス化することで作成します。

インスタンス化の際には、以下の引数に適切な値を入れましょう。

  • target引数:スレッドで実行する関数
  • args引数:その関数の引数をタプルとして指定

例えば以下のようなコードでは、Threadクラスを使って関数print_numbersprint_lettersをそれぞれのスレッドで実行しています。

def print_numbers():
    for i in range(10):
        print(i)

def print_letters(letter):
    for _ in range(10):
        print(letter)

thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters, args=('A',))

thread1.start()
thread2.start()

thread2は、引数として受け取った文字を表示するprint_letters関数を実行しています。

args引数において、タプル形式でその関数へ引数を指定するため、引数がひとつでも、カンマをつけてタプル形式にすることを忘れないようにしましょう。

また、Threadクラスはサブクラス化することも可能。

runメソッドをオーバーライドすることでカスタマイズできます。

以下がその例です。

class MyThread(threading.Thread):
    def run(self):
        print("Custom thread starting.")
        time.sleep(3)
        print("Custom thread finishing.")

thread = MyThread()
thread.start()

このコードでは、MyThreadのインスタンスを作成し、startメソッドを呼び出すと、カスタマイズしたrunメソッドが実行されます。

スレッドローカルデータの使用

スレッドローカルデータとは、各スレッドがそれぞれ独立に所有するデータのことです。

threading.local関数を用いることで、スレッドローカルデータのインスタンスを作成できます。

以下は、各スレッドにユニークなデータを持たせる例です。

import threading

def worker(num):
    thread_data.value = num
    print(f"Thread {num} has data: {thread_data.value}")

thread_data = threading.local()

for i in range(5):
    thread = threading.Thread(target=worker, args=(i,))
    thread.start()

各スレッドは、それぞれ独自のthread_data.valueを持ち、互いに影響を与えることなく値を操作できます。

スレッド間でデータが混乱することなく、各スレッドが自分自身のデータを効果的に管理できるのです。

スレッド制御と同期

次に、スレッドの制御と同期について学んでいきます。

  • デーモンスレッドとGIL
  • 排他制御(Lockオブジェクト)
  • 再帰的排他制御(RLockオブジェクト)
  • セマフォ(Semaphoreオブジェクト)
  • イベント(Eventオブジェクト)
  • タイマー (Timerオブジェクト)
  • バリア(Barrierオブジェクト)
  • 条件変数(Conditionオブジェクト)

デーモンスレッドとGIL

デーモンスレッドは、プログラムの終了をブロックしない特別なスレッドです。

Threadオブジェクトのdaemon属性をTrueに設定すると、そのスレッドはデーモンスレッドとなります。

デーモンスレッドは、メインスレッドが終了すると同時に強制的に終了します。

def daemon_thread():
    while True:
        time.sleep(1)
        print("Daemon thread is running.")

thread = threading.Thread(target=daemon_thread)
thread.daemon = True
thread.start()
time.sleep(5)
print("Main thread is finished.")

メインスレッドが終了すると、無限ループを持つデーモンスレッドも同時に終了します。

またGIL(Global Interpreter Lock)は、Pythonのインタープリタが同時に複数のスレッドからアクセスされることを防ぐ仕組み。

このため計算処理が多いタスクでは、マルチスレッドが期待するほどのパフォーマンス向上をもたらさない場合があります。

排他制御(Lockオブジェクト)

排他制御は、複数のスレッドが同時に共有データを変更してしまうことを防ぐための技術です。

threading.Lockオブジェクトは最も基本的な排他制御のツール。

一度にひとつのスレッドのみがロックを取得でき、ロックを持つスレッドのみが特定のコードブロックを実行できます。

lock = threading.Lock()
data = 0

def worker():
    global data
    for _ in range(10000):
        with lock:
            data += 1

threads = [threading.Thread(target=worker) for _ in range(5)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

print(f"data: {data}")

複数のスレッドが共有データdataを同時に変更しようとするときに、Lockオブジェクトでそのアクセスを制御しています。

結果として、dataへの変更が競合せず、正確な結果が得られるのです。

再帰的排他制御(RLockオブジェクト)

threading.RLockオブジェクトは、Lockオブジェクトと同様に排他制御を提供しますが、同一スレッド内でのロック再入を許可します。

同一スレッド内で複数のロックを取得する必要がある場合や、再帰的な関数呼び出しをする際などに有用です。

rlock = threading.RLock()

def recursive_worker(level):
    with rlock:
        if level:
            print(f"Entering level {level}")
            recursive_worker(level - 1)
            print(f"Leaving level {level}")

thread = threading.Thread(target=recursive_worker, args=(5,))
thread.start()

RLockを用いて、同一スレッド内でロックの再入をおこなっています。

再帰的な関数recursive_worker内で同じロックを複数回取得できるのです。

セマフォ(Semaphoreオブジェクト)

セマフォは、一度に特定数のスレッドのみがコードを実行できるよう制限するツールです。

threading.Semaphoreオブジェクトは、初期値として許可されるスレッドの数を取り、その数だけスレッドが同時にコードブロックを実行できます。

以下のコードは、同時に2つのスレッドだけがリソースを利用できるよう、セマフォを設定した例です。

semaphore = threading.Semaphore(2)

def access_resource(num):
    print(f'Thread {num} is waiting for the resource.')
    semaphore.acquire()
    print(f'Thread {num} starts using the resource.')
    time.sleep(1)
    print(f'Thread {num} is done using the resource.')
    semaphore.release()

threads = []
for i in range(5):
    thread = threading.Thread(target=access_resource, args=(i,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

一度に2つのスレッドだけがリソース(ここではsleep関数)を利用し、残りのスレッドはリソースが解放されるのを待つことになります。

イベント(Eventオブジェクト)

threading.Eventオブジェクトは、スレッド間のシンプルな通信を実現します。

特定のイベントが発生したことを、ほかのスレッドに通知するために使用するものです。

イベントオブジェクトは、内部フラグを持っており、以下のメソッドで状態を設定できます。

  • setメソッド:フラグがTrue
  • clearメソッド:フラグがFalse

waitメソッドを使って、フラグがTrueになるのを待てます。

Eventオブジェクトを用いた例はこちらです。

event = threading.Event()

def wait_for_event():
    print('Waiting for the event to be set.')
    event.wait()
    print('The event has been set.')

def set_event():
    time.sleep(2)
    print('Setting the event.')
    event.set()

thread1 = threading.Thread(target=wait_for_event)
thread2 = threading.Thread(target=set_event)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

wait_for_event関数がイベントがセットされるまで待ち、set_event関数がそのイベントをセットします。

このようにして、イベントオブジェクトは、特定のイベントが発生したことを他のスレッドに通知できるのです。

タイマー (Timerオブジェクト)

threading.Timerは、指定した時間が経過した後に関数を実行するためのスレッドです。

タイマーは一度だけ実行されますが、必要に応じてリセットできます。

以下のコードは、2秒後にメッセージを表示するタイマーを作成する例です。

def display_message():
    print('Hello, Timer!')

timer = threading.Timer(2.0, display_message)
timer.start()

2秒後に”Hello, Timer!”と表示されます。

バリア(Barrierオブジェクト)

threading.Barrierは、特定数のスレッドがバリア地点に達するまで、全スレッドの進行を阻止するもの。

全スレッドが同じ地点で開始することを保証できます。

3つのスレッドが全てバリア地点に達したときに、それぞれがメッセージを表示する例です。

barrier = threading.Barrier(3)

def worker(num):
    print(f'Thread {num} is waiting at the barrier.')
    barrier.wait()
    print(f'Thread {num} has crossed the barrier.')

threads = []
for i in range(3):
    thread = threading.Thread(target=worker, args=(i,))
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

各スレッドがバリアに到達し、全スレッドがバリアを通過したときに、それぞれがメッセージを表示します。

条件変数の使い方

条件変数は、ある条件が満たされるまでスレッドが待機することを可能にするものです。

Conditionオブジェクトを使ったサンプルコードを示します。

from threading import Thread, Condition
import time

# 商品を格納するリスト
products = []

# 条件オブジェクトの作成
condition = Condition()

# 生産者スレッド
class Producer(Thread):
    def run(self):
        global products
        global condition

        while True:
            with condition:
                if len(products) == 0:
                    print('生産者: 商品を生産しました')
                    products.append(1)
                    condition.notify()

            time.sleep(2)

# 消費者スレッド
class Consumer(Thread):
    def run(self):
        global products
        global condition

        while True:
            with condition:
                if len(products) > 0:
                    print('消費者: 商品を消費しました')
                    products.pop(0)
                else:
                    print('消費者: 商品がないため待機します')
                    condition.wait()

            time.sleep(2)

# スレッドの生成
producer_thread = Producer()
consumer_thread = Consumer()

# スレッドの開始
producer_thread.start()
consumer_thread.start()

# スレッドの終了待ち
producer_thread.join()
consumer_thread.join()

producerスレッドがデータを生成し、consumerスレッドがそのデータを消費する構成です。

Conditionオブジェクトは、consumerスレッドがデータが利用可能になるまで待つため、そしてproducerスレッドがデータを追加したときにconsumerスレッドに通知するために使われています。

プロセシング

ここではPythonでのプロセッシング、つまり複数のプロセスを用いた並列処理について学んでいきましょう。

  • サブプロセスの実行
  • multiprocessingモジュールの使い方
  • concurrent.futuresの活用方法

サブプロセスの実行

サブプロセスの実行は、Pythonでほかのプログラムを呼び出す際によく使用されます。

これはsubprocessモジュールを使用しましょう。

以下にその基本的な使用例を示します。

import subprocess

# Runs the ls command, and waits for it to complete
result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)

# Print the result
print('Returned result:', result.returncode)
print('stdout:', result.stdout.decode('utf-8'))

Unix系のシステムであるLinuxやMacで正常に動作しますが、Windowsではlsコマンドが存在しないため、動作しません。

multiprocessingモジュールの使い方

multiprocessingモジュールは、Pythonでマルチプロセッサを使って、並列処理を実現するためのモジュールです。

このモジュールを使うことで、PythonのスレッドがGILによる制約を受けずに、CPUコアをフルに活用できます。

以下が使用例です。

import multiprocessing

def worker(number):
    print(f'Worker {number} is working.')

# Create and start 5 worker processes
for i in range(5):
    multiprocessing.Process(target=worker, args=(i,)).start()

5つのプロセスを作成し、それぞれのプロセスでworker関数を実行します。

マルチコアのシステムであれば、5つのコアが同時に活用できるのです。

concurrent.futuresの活用方法

concurrent.futuresモジュールは、非同期処理をより高レベルで扱うためのモジュールです。

このモジュールにより、並列処理のタスクをExecutorと呼ばれるオブジェクトにスケジューリングし、その結果をFutureオブジェクトとして取得できます。

以下がその使用例です。

from concurrent.futures import ThreadPoolExecutor

def worker(number):
    return f'Worker {number} has finished.'

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = [executor.submit(worker, i) for i in range(5)]

for future in futures:
    print(future.result())

5つのタスクをスケジュールし、それぞれのタスクの結果を待って出力。

スレッドプールを使うことで、タスクのスケジューリングと結果の取得が大幅に簡単になります。

まとめ

当記事でお伝えしてきた内容は以下のとおりです。

  • Pythonでのマルチスレッド処理の基本
  • threadingモジュールの使い方とその応用
  • Pythonマルチスレッドプログラミングの実例

Pythonのスレッド制御を理解すれば、処理を並列化し、より高速なプログラムを作成できます。

ただしスレッドは、正しく扱わないとデータの一貫性を損ねたり、予期しないバグを引き起こしたりする可能性があります。

排他制御や条件変数などの同期プリミティブを理解し、適切に使用することが重要です。

threadingモジュールだけでなく、ほかのモジュールについても詳しくなることで、より複雑なプログラムが作れるようになります。

当サイトの他記事も参考に、手を動かして、覚えていきましょう。

タイトルとURLをコピーしました