(最終更新日:2023年7月)
✔このような方へ向けて書かれた記事となります
「Pythonでのマルチスレッド処理ってどのように実装するの?」
「threadingモジュールの使い方を詳しく知りたい」
「Pythonによるマルチスレッドプログラミングの具体例を見たい」
✔当記事を通じてお伝えすること
- Pythonでのマルチスレッド処理の基本
- threadingモジュールの使い方とその応用
- Pythonマルチスレッドプログラミングの実例
当記事では、Pythonでのマルチスレッド処理の基本から、threadingモジュールの正しい使い方や応用方法、さらには具体的な実例を詳しく解説しています。
ぜひ最後までご覧ください。
スレッドの基本と使用方法
ここでは、Pythonのスレッドについて説明し、その基本的な使用方法について学んでいきます。
基本を理解したうえで、先の応用した使い方へ進んでください。
- Pythonのスレッドとは
- threadingモジュールの概要
- Threadオブジェクトの作成とカスタマイズ
- スレッドローカルデータの使用
Pythonのスレッドとは
スレッドとは、プログラム内で並行にタスクを実行するための手段のこと。
一部のタスクが待機中でも、ほかのタスクが進行し、プログラム全体の効率が向上します。
Pythonでは、threading
モジュールを用いて簡単にスレッドを扱えます。
例えば以下のようなコードで、2つのスレッドを同時に動作し、結果をコンソールに出力可能です。
import threading
import time
def print_numbers():
for i in range(10):
time.sleep(1)
print(i)
def print_letters():
for letter in 'abcdefghij':
time.sleep(1.5)
print(letter)
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters)
thread1.start()
thread2.start()
print_numbers
関数とprint_letters
関数が別々のスレッドで並行に実行され、数字と文字が交互に出力されます。
threadingモジュールの概要
Pythonのthreading
モジュールは、高レベルのスレッドインターフェースを提供します。
このモジュールを使用すると、以下のような操作が可能です。
- スレッドの生成
- スレッドの起動
- スレッドの結合
- スレッドの同期化
具体的には、Thread
クラスを用いて新たなスレッドを生成し、そのstart
メソッドを呼び出すことでスレッドを開始できます。
新たなスレッドの作成は、Thread
クラスをインスタンス化し、そのtarget
引数にスレッドで実行する関数を指定しましょう。
import threading
# スレッドで実行する関数
def print_numbers():
for i in range(1, 6):
print(i)
# 新しいスレッドを生成して実行
thread = threading.Thread(target=print_numbers)
thread.start()
# メインスレッドの処理
for i in range(6, 11):
print(i)
Threadオブジェクトの作成とカスタマイズ
新たなスレッドは、t
hreading.Thread
クラスをインスタンス化することで作成します。
インスタンス化の際には、以下の引数に適切な値を入れましょう。
target
引数:スレッドで実行する関数args
引数:その関数の引数をタプルとして指定
例えば以下のようなコードでは、Thread
クラスを使って関数print_numbers
とprint_letters
をそれぞれのスレッドで実行しています。
def print_numbers():
for i in range(10):
print(i)
def print_letters(letter):
for _ in range(10):
print(letter)
thread1 = threading.Thread(target=print_numbers)
thread2 = threading.Thread(target=print_letters, args=('A',))
thread1.start()
thread2.start()
thread2は、引数として受け取った文字を表示するprint_letters関数を実行しています。
args
引数において、タプル形式でその関数へ引数を指定するため、引数がひとつでも、カンマをつけてタプル形式にすることを忘れないようにしましょう。
また、Thread
クラスはサブクラス化することも可能。
run
メソッドをオーバーライドすることでカスタマイズできます。
以下がその例です。
class MyThread(threading.Thread):
def run(self):
print("Custom thread starting.")
time.sleep(3)
print("Custom thread finishing.")
thread = MyThread()
thread.start()
このコードでは、MyThread
のインスタンスを作成し、start
メソッドを呼び出すと、カスタマイズしたrun
メソッドが実行されます。
スレッドローカルデータの使用
スレッドローカルデータとは、各スレッドがそれぞれ独立に所有するデータのことです。
threading.local
関数を用いることで、スレッドローカルデータのインスタンスを作成できます。
以下は、各スレッドにユニークなデータを持たせる例です。
import threading
def worker(num):
thread_data.value = num
print(f"Thread {num} has data: {thread_data.value}")
thread_data = threading.local()
for i in range(5):
thread = threading.Thread(target=worker, args=(i,))
thread.start()
各スレッドは、それぞれ独自のthread_data.value
を持ち、互いに影響を与えることなく値を操作できます。
スレッド間でデータが混乱することなく、各スレッドが自分自身のデータを効果的に管理できるのです。
スレッド制御と同期
次に、スレッドの制御と同期について学んでいきます。
- デーモンスレッドとGIL
- 排他制御(Lockオブジェクト)
- 再帰的排他制御(RLockオブジェクト)
- セマフォ(Semaphoreオブジェクト)
- イベント(Eventオブジェクト)
- タイマー (Timerオブジェクト)
- バリア(Barrierオブジェクト)
- 条件変数(Conditionオブジェクト)
デーモンスレッドとGIL
デーモンスレッドは、プログラムの終了をブロックしない特別なスレッドです。
Thread
オブジェクトのdaemon
属性をTrue
に設定すると、そのスレッドはデーモンスレッドとなります。
デーモンスレッドは、メインスレッドが終了すると同時に強制的に終了します。
def daemon_thread():
while True:
time.sleep(1)
print("Daemon thread is running.")
thread = threading.Thread(target=daemon_thread)
thread.daemon = True
thread.start()
time.sleep(5)
print("Main thread is finished.")
メインスレッドが終了すると、無限ループを持つデーモンスレッドも同時に終了します。
またGIL(Global Interpreter Lock)は、Pythonのインタープリタが同時に複数のスレッドからアクセスされることを防ぐ仕組み。
このため計算処理が多いタスクでは、マルチスレッドが期待するほどのパフォーマンス向上をもたらさない場合があります。
排他制御(Lockオブジェクト)
排他制御は、複数のスレッドが同時に共有データを変更してしまうことを防ぐための技術です。
threading.Lock
オブジェクトは最も基本的な排他制御のツール。
一度にひとつのスレッドのみがロックを取得でき、ロックを持つスレッドのみが特定のコードブロックを実行できます。
lock = threading.Lock()
data = 0
def worker():
global data
for _ in range(10000):
with lock:
data += 1
threads = [threading.Thread(target=worker) for _ in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(f"data: {data}")
複数のスレッドが共有データdata
を同時に変更しようとするときに、Lock
オブジェクトでそのアクセスを制御しています。
結果として、data
への変更が競合せず、正確な結果が得られるのです。
再帰的排他制御(RLockオブジェクト)
threading.RLock
オブジェクトは、Lock
オブジェクトと同様に排他制御を提供しますが、同一スレッド内でのロック再入を許可します。
同一スレッド内で複数のロックを取得する必要がある場合や、再帰的な関数呼び出しをする際などに有用です。
rlock = threading.RLock()
def recursive_worker(level):
with rlock:
if level:
print(f"Entering level {level}")
recursive_worker(level - 1)
print(f"Leaving level {level}")
thread = threading.Thread(target=recursive_worker, args=(5,))
thread.start()
RLock
を用いて、同一スレッド内でロックの再入をおこなっています。
再帰的な関数recursive_worker
内で同じロックを複数回取得できるのです。
セマフォ(Semaphoreオブジェクト)
セマフォは、一度に特定数のスレッドのみがコードを実行できるよう制限するツールです。
threading.Semaphore
オブジェクトは、初期値として許可されるスレッドの数を取り、その数だけスレッドが同時にコードブロックを実行できます。
以下のコードは、同時に2つのスレッドだけがリソースを利用できるよう、セマフォを設定した例です。
semaphore = threading.Semaphore(2)
def access_resource(num):
print(f'Thread {num} is waiting for the resource.')
semaphore.acquire()
print(f'Thread {num} starts using the resource.')
time.sleep(1)
print(f'Thread {num} is done using the resource.')
semaphore.release()
threads = []
for i in range(5):
thread = threading.Thread(target=access_resource, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
一度に2つのスレッドだけがリソース(ここではsleep
関数)を利用し、残りのスレッドはリソースが解放されるのを待つことになります。
イベント(Eventオブジェクト)
threading.Eventオブジェクトは、スレッド間のシンプルな通信を実現します。
特定のイベントが発生したことを、ほかのスレッドに通知するために使用するものです。
イベントオブジェクトは、内部フラグを持っており、以下のメソッドで状態を設定できます。
set
メソッド:フラグがTrue
clear
メソッド:フラグがFalse
wait
メソッドを使って、フラグがTrue
になるのを待てます。
Event
オブジェクトを用いた例はこちらです。
event = threading.Event()
def wait_for_event():
print('Waiting for the event to be set.')
event.wait()
print('The event has been set.')
def set_event():
time.sleep(2)
print('Setting the event.')
event.set()
thread1 = threading.Thread(target=wait_for_event)
thread2 = threading.Thread(target=set_event)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
wait_for_event
関数がイベントがセットされるまで待ち、set_event
関数がそのイベントをセットします。
このようにして、イベントオブジェクトは、特定のイベントが発生したことを他のスレッドに通知できるのです。
タイマー (Timerオブジェクト)
threading.Timer
は、指定した時間が経過した後に関数を実行するためのスレッドです。
タイマーは一度だけ実行されますが、必要に応じてリセットできます。
以下のコードは、2秒後にメッセージを表示するタイマーを作成する例です。
def display_message():
print('Hello, Timer!')
timer = threading.Timer(2.0, display_message)
timer.start()
2秒後に”Hello, Timer!”と表示されます。
バリア(Barrierオブジェクト)
threading.Barrier
は、特定数のスレッドがバリア地点に達するまで、全スレッドの進行を阻止するもの。
全スレッドが同じ地点で開始することを保証できます。
3つのスレッドが全てバリア地点に達したときに、それぞれがメッセージを表示する例です。
barrier = threading.Barrier(3)
def worker(num):
print(f'Thread {num} is waiting at the barrier.')
barrier.wait()
print(f'Thread {num} has crossed the barrier.')
threads = []
for i in range(3):
thread = threading.Thread(target=worker, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
各スレッドがバリアに到達し、全スレッドがバリアを通過したときに、それぞれがメッセージを表示します。
条件変数の使い方
条件変数は、ある条件が満たされるまでスレッドが待機することを可能にするものです。
Condition
オブジェクトを使ったサンプルコードを示します。
from threading import Thread, Condition
import time
# 商品を格納するリスト
products = []
# 条件オブジェクトの作成
condition = Condition()
# 生産者スレッド
class Producer(Thread):
def run(self):
global products
global condition
while True:
with condition:
if len(products) == 0:
print('生産者: 商品を生産しました')
products.append(1)
condition.notify()
time.sleep(2)
# 消費者スレッド
class Consumer(Thread):
def run(self):
global products
global condition
while True:
with condition:
if len(products) > 0:
print('消費者: 商品を消費しました')
products.pop(0)
else:
print('消費者: 商品がないため待機します')
condition.wait()
time.sleep(2)
# スレッドの生成
producer_thread = Producer()
consumer_thread = Consumer()
# スレッドの開始
producer_thread.start()
consumer_thread.start()
# スレッドの終了待ち
producer_thread.join()
consumer_thread.join()
producer
スレッドがデータを生成し、consumer
スレッドがそのデータを消費する構成です。
Condition
オブジェクトは、consumer
スレッドがデータが利用可能になるまで待つため、そしてproducer
スレッドがデータを追加したときにconsumer
スレッドに通知するために使われています。
プロセシング
ここではPythonでのプロセッシング、つまり複数のプロセスを用いた並列処理について学んでいきましょう。
- サブプロセスの実行
- multiprocessingモジュールの使い方
- concurrent.futuresの活用方法
サブプロセスの実行
サブプロセスの実行は、Pythonでほかのプログラムを呼び出す際によく使用されます。
これはsubprocessモジュールを使用しましょう。
以下にその基本的な使用例を示します。
import subprocess
# Runs the ls command, and waits for it to complete
result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)
# Print the result
print('Returned result:', result.returncode)
print('stdout:', result.stdout.decode('utf-8'))
Unix系のシステムであるLinuxやMacで正常に動作しますが、Windowsではls
コマンドが存在しないため、動作しません。
multiprocessingモジュールの使い方
multiprocessingモジュールは、Pythonでマルチプロセッサを使って、並列処理を実現するためのモジュールです。
このモジュールを使うことで、PythonのスレッドがGILによる制約を受けずに、CPUコアをフルに活用できます。
以下が使用例です。
import multiprocessing
def worker(number):
print(f'Worker {number} is working.')
# Create and start 5 worker processes
for i in range(5):
multiprocessing.Process(target=worker, args=(i,)).start()
5つのプロセスを作成し、それぞれのプロセスでworker関数を実行します。
マルチコアのシステムであれば、5つのコアが同時に活用できるのです。
concurrent.futuresの活用方法
concurrent.futuresモジュールは、非同期処理をより高レベルで扱うためのモジュールです。
このモジュールにより、並列処理のタスクをExecutorと呼ばれるオブジェクトにスケジューリングし、その結果をFutureオブジェクトとして取得できます。
以下がその使用例です。
from concurrent.futures import ThreadPoolExecutor
def worker(number):
return f'Worker {number} has finished.'
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(worker, i) for i in range(5)]
for future in futures:
print(future.result())
5つのタスクをスケジュールし、それぞれのタスクの結果を待って出力。
スレッドプールを使うことで、タスクのスケジューリングと結果の取得が大幅に簡単になります。
まとめ
当記事でお伝えしてきた内容は以下のとおりです。
- Pythonでのマルチスレッド処理の基本
- threadingモジュールの使い方とその応用
- Pythonマルチスレッドプログラミングの実例
Pythonのスレッド制御を理解すれば、処理を並列化し、より高速なプログラムを作成できます。
ただしスレッドは、正しく扱わないとデータの一貫性を損ねたり、予期しないバグを引き起こしたりする可能性があります。
排他制御や条件変数などの同期プリミティブを理解し、適切に使用することが重要です。
threadingモジュールだけでなく、ほかのモジュールについても詳しくなることで、より複雑なプログラムが作れるようになります。
当サイトの他記事も参考に、手を動かして、覚えていきましょう。