ロゴ

【Python】とりあえず並列処理を実装するためのMultiprocessing解説

表紙

pythonの処理が遅い...だけどCPU使用率は低い...。それはPythonの処理をシングルプロセスで動かしているからです。

OS(Windows, Linux...)にとって一つのタスクは一つのプロセスとして処理します。例えると、一つブラウザーを開くと一つのプロセスが作成されます。そのあとに一つゲームを開くと、また一つプロセスが作成されます。

ですが一つのプロセッサは一つのプロセスしか処理できません。順番に処理することしかできないのです。そうすると何個も何個もプロセスを作っていくと、処理がとても遅くなってしまいますよね。

それは困るので最近のCPUには複数のプロセッサが搭載されています。「複数のプロセスを複数のプロセッサで処理すれば遅くならないよね。」ってことです。

pythonの処理を複数のプロセスにして並列処理する方法の一つが「multiprocessingモジュール」です。

これを使うと、複数のプロセス(サブプロセス)を新しく生成し、複数のプロセッサに分散させて並列に処理することができます。


今回は「multiprocessingモジュール」を使った並列処理のやり方を説明します。

Multiprocessing

MultiprocessingはPythonの標準ライブラリなので、pip等でのインストールは不要です。

インポートは次のコードをプログラムの最初に書くことでできます。

インポート
from multiprocessing import XXX

XXXの部分は使うクラス名が入ります。使うクラスは次の章でそれぞれ解説します。

並列処理をやる

それでは早速pythonで並列処理を使う方法を解説します。

今回は「とりあえず使えるようになる」ことを目標にしているので、詳しいことは説明しません。

プログラム例や、もっと詳しい使い方はほかの記事をご覧ください。

プロセス(Process)

Processクラスを使って、簡単にサブプロセスを作成することができます。

インポート
from multiprocessing import Process

Processクラスのインポートは上のコードでできます。

使い方
<変数名> = Process(target=<実行する関数名>, args=(<引数1>,<引数2>,))
<変数名>.start()

targetに実行したい関数の名前を指定します。かっこはつけないでください。

argsにはかっこを付けて、かっこの中には引数を入れてください。必ずかっこの中の最後にカンマ(, )を入れてください。

サンプルコード
from multiprocessing import Process

# 子プロセスが実行する処理
def sub_proc(name):
    print(f"    running sub process {name}...")


# === 開始 ===

proc = Process(target=sub_proc, args=('SUB',)) # インスタンス化

print("subprocess will start")
proc.start() # サブプロセスの開始
print("subprocess has been started")

print(f"    runnnig main process MAIN...")

print("wait for subprocess to terminate")
proc.join() # サブプロセスの終了を待つ。
print("subprocess has been terminated")

# === 終了 === 
出力
subprocess will start
subprocess has been started
    runnnig main process MAIN...
wait for subprocess to terminate
    running sub process SUB...
subprocess has been terminated

出力を見ると、サブプロセスを開始(.start())した後も、そのあとの処理が行われていることがわかります。

ですが、.join()の後は処理されていません。

また、サブプロセス内でprint()をしても、.join()をするまで表示されないことがわかります。

Processに実行したい関数と引数を与えたインスタンスを作成して.start()でサブプロセスを開始します。.join()が来るまでメインプロセスでは処理が継続されます。その間もサブプロセスでも処理が行われています。

.join()があるとメインプロセスはサブプロセスが終了するまで停止します。また、子プロセスの戻り値をここで受け取ります。

なので先ほどのコードは、関数が.start()された時にprint()が表示されず、.join()が実行されたときにprint()が実行されたのです。

使い方の例

元のコード
import random
import time

def proc(): # 時間のかかる処理
    wait=random.random()*3
    time.sleep(wait)
    print(f"proc wait for {wait}")

t0 = time.perf_counter()
print(" === 開始 === ")

proc() # ここを変える!!

wait=random.random()*3
time.sleep(wait)
print(f"wait for {wait}")

print(" === 終了 === ")
t = time.perf_counter()
print(f"{t-t0} sec")
出力結果:
 === 開始 === 
proc wait for 2.368812886043213
wait for 2.6144671053610162
=== 終了 === 
4.988699100998929 sec
並列処理にしたコード
import random
import time
from multiprocessing import Process # インポート

def proc(): # 時間のかかる処理
wait=random.random()*3
    time.sleep(wait)
    print(f"proc wait for {wait}")

t0 = time.perf_counter()
print(" === 開始 === ")

process=Process(target=proc) # 変更後 #1
process.start()              #        #2

wait=random.random()*3
time.sleep(wait)
print(f"wait for {wait}")

process.join()               #        #3

print(" === 終了 === ")
t = time.perf_counter()
print(f"{t-t0} sec")
出力結果:
 === 開始 === 
proc wait for 1.5502454634814107
wait for 2.4976007878679587
 === 終了 === 
2.5082451149937697 sec

並列処理にしたことで、元のコードの約半分の処理時間で処理することが出来ました。

これがmultiprocessingの基本の使い方、Processです。

プロセスプール(Process Pool)

先ほどのProcessクラスではサブプロセスを一つ一つ作っていました。

しかし、ループ処理のように何回も同じ関数を呼び出す場合は、一つ一つサブプロセスを作るのは大変です。

そんな時に使えるのがプロセスプールです。for文で関数を実行したい回数プールを呼び出すと、ループの回数分サブプロセスを作成してくれます。

また、作成するサブプロセスの動作をメゾットで指定できます。以下は一例です。

メゾット簡単な説明
apply同期処理 前の処理が終わまで待つ
apply_async非同期処理 自動で最大のプロセスを作成
close全てのサブプロセスの終了を待つ
terminateすべてのサブプロセスの強制終了
joinサブプロセス内のprint等はここで表示される。必ずcloseかterminateの後に入れる

インポート
from multiprocessing import Pool

これでプロセスプールを実行できるようになります。

使い方
# 同期処理apply
p = Pool(4) # 同時に処理するサブプロセス数
for i in range(<回数>):
    <変数名> = p.apply(<関数名>, args=(<引数1>, <引数2>,))

# 非同期処理apply_async
p = Pool(4) # 同時に処理するサブプロセス数
for i in range(<回数>):
    <変数名> = p.apply_async(<関数名>, args=(<引数1>, <引数2>,))

Processクラスのようにスタートの合図はありません。自動で開始されます。

並列処理ができるように書いていますが、同期処理applyの場合は、並列では処理されません。

前の処理が終わるまでroopは次に行きません。注意してください。

サンプルコード
from multiprocessing import Pool

def func(cnt):
    print(f"process {cnt}")

p = Pool() # 名にも指定しなければ自動で最大数
for i in range(5):
    proc = p.apply_async(func, args=(i,))

p.close() # 終わるのを待つ
p.join() # 各プロセスの出力を得る
出力結果:
process 0
process 1
process 3
process 2
process 4

使い方の例

元のコード
import random
import time

def task(): # 時間のかかる処理
    time.sleep(random.random()*3)

t0 = time.perf_counter()
for i in range(5):
    task()              # ここを変える!
t = time.perf_counter()
print(f"{t-t0} sec")
出力結果
8.40565775000141 sec
並列処理にしたコード
import random
import time
from multiprocessing import Pool # インポート

def task(): # 時間のかかる処理
    time.sleep(random.random()*3)

t0 = time.perf_counter()
p = Pool()                      # 変更後 #1
for i in range(5):
    proc = p.apply_async(task)  #        #2
t = time.perf_counter()
print(f"{t-t0} sec")
出力結果:
0.04439821298001334 sec

かなり早くなりましたね!

Poolクラスを使うことで、forループ内の関数を高速化できました。

まとめ

今回は、とりあえずpythonで並列処理を使えるようにする方法をとても簡単に説明しました。

これができるようになるだけで、pythonの処理が遅いという欠点を少し薄めることができます。

今回は関数からの戻り値などの受け取り方を説明しませんでした。もっと詳しい使い方はほかの記事で紹介します。お待ちください!


この記事があなたのプログラミングライフに役立つことを願います。

最後までご覧いただきありがとうございます!ではでは~