Skip to content

並列処理

TLDR: Python の組み込み multiprocessing モジュールと Polars を一緒に使用して並列処理に関する Polars エラーが発生する場合、開始方法として fork ではなく spawn を使用していることを確認してください:

from multiprocessing import get_context


def my_fun(s):
    print(s)


with get_context("spawn").Pool() as pool:
    pool.map(my_fun, ["input1", "input2", ...])

並列処理を使用しない場合

詳細に入る前に、Polars は最初からすべての CPU コアを使用するように構築されていることを強調することが重要です。 これは、並行して実行できる計算を別々のスレッドで実行することによって実現されます。 例えば、select 文で 2 つの式を要求することは並行して行うことができ、結果は最後にのみ結合されます。 別の例としては、group_by().agg(<expr>) を使用してグループ内で値を集約する場合、各グループは別々に評価することができます。 これらの場合に multiprocessing モジュールがコードのパフォーマンスを向上させる可能性は非常に低いです。

最適化についてもっと知りたい場合は最適化の章 を参照してください。

並列処理を使用しない場合

Polars はマルチスレッドですが、他のライブラリはシングルスレッドかもしれません。 他のライブラリがボトルネックで、解決の手がかりが並列化可能な場合、並列処理を使用して速度を上げることは理にかなっています。

デフォルトの並列処理の設定の問題

概要

Python の並列処理のドキュメント ではプロセスプールを作成する3つの方法が記載されています:

  1. spawn
  2. fork
  3. forkserver

fork の説明は (2022-10-15 時点):

親プロセスは os.fork() を使用して Python インタープリターをフォークします。子プロセスは開始すると、親プロセスと実質的に同一です。親のすべてのリソースは子プロセスに継承されます。マルチスレッドプロセスを安全にフォークすることは問題があることに注意してください。

Unix でのみ利用可能。Unix のデフォルト。

端的に言うと:Polars は強力なパフォーマンスを提供する目的でマルチスレッドで処理します。 したがって、fork と組み合わせることはできません。 Unix (Linux、BSD など) を使用している場合、明示的にオーバーライドしない限り、fork を使用します。

この問題に以前遭遇していなかった理由は、純粋な Python コードやほとんどの Python ライブラリは(ほとんどが)シングルスレッドだからです。 または、Windows や MacOS を使用しているため、fork はそもそも利用可能な方法ではありません(MacOS は Python 3.7 まで)。

このため代わりに spawnforkservert を使うべきです。spawn はすべてのプラットフォームで使用可能で最も安全な選択のため、この方法が推奨されます。

fork の問題は、親プロセスの状態をコピーすることにあります。 Polars のイシュートラッカーに投稿されたものを少し変更した以下の例を考えてみてください:

import multiprocessing
import polars as pl


def test_sub_process(df: pl.DataFrame, job_id):
    df_filtered = df.filter(pl.col("a") > 0)
    print(f"Filtered (job_id: {job_id})", df_filtered, sep="\n")


def create_dataset():
    return pl.DataFrame({"a": [0, 2, 3, 4, 5], "b": [0, 4, 5, 56, 4]})


def setup():
    # some setup work
    df = create_dataset()
    df.write_parquet("/tmp/test.parquet")


def main():
    test_df = pl.read_parquet("/tmp/test.parquet")

    for i in range(0, 5):
        proc = multiprocessing.get_context("spawn").Process(
            target=test_sub_process, args=(test_df, i)
        )
        proc.start()
        proc.join()

        print(f"Executed sub process {i}")


if __name__ == "__main__":
    setup()
    main()

spawn の代わりに fork を使用すると、デッドロックが発生します。 注意:Polars は並列処理の方法が間違っているというエラーを出して起動すらしませんが、チェックがなかった場合、デッドロックします。

fork メソッドは os.fork() を呼び出すことに相当し、これは POSIX 標準 で定義されているシステムコールです:

プロセスは単一のスレッドで作成されます。マルチスレッドプロセスが fork() を呼び出した場合、新しいプロセスには呼び出しスレッドとその完全なアドレス空間のレプリカが含まれます。これには、ミューテックスの状態などのリソースも含まれる可能性があります。したがって、エラーを避けるために、子プロセスは exec 関数のいずれかが呼び出されるまで、非同期シグナル安全な操作のみを実行することができます。

一方で spawn は完全に新しいフレッシュな Python インタープリターを作成し、ミューテックスの状態を継承しません。

では、コード例で何が起こるのでしょうか? ファイルを pl.read_parquet で読むためには、ファイルをロックする必要があります。 その後、os.fork() が呼び出され、親プロセスの状態をコピーします。これにはミューテックスも含まれます。 したがって、すべての子プロセスは、獲得された状態でファイルロックをコピーし、ファイルロックが解放されるのを無期限に待つことになりますが、それは決して起こりません。

これらの問題をデバッグするのが難しいのは、fork が機能する可能性があるためです。 pl.read_parquet の呼び出しがない例に変更してみてください:

import multiprocessing
import polars as pl


def test_sub_process(df: pl.DataFrame, job_id):
    df_filtered = df.filter(pl.col("a") > 0)
    print(f"Filtered (job_id: {job_id})", df_filtered, sep="\n")


def create_dataset():
    return pl.DataFrame({"a": [0, 2, 3, 4, 5], "b": [0, 4, 5, 56, 4]})


def main():
    test_df = create_dataset()

    for i in range(0, 5):
        proc = multiprocessing.get_context("fork").Process(
            target=test_sub_process, args=(test_df, i)
        )
        proc.start()
        proc.join()

        print(f"Executed sub process {i}")


if __name__ == "__main__":
    main()

これは問題なく機能します。 一見関係のない変更が並列処理のコードを壊す可能性があるため、ここでの簡単な例ではなく、より大きなコードベースでこれらの問題をデバッグすることは、大変な苦痛を伴う可能性があります。 従って、やむを得ない特別な要件があるとき以外は、一般的にはマルチスレッドのライブラリを使用する場合は fork の開始方法を使用するべきでありません。

fork の長所と短所

例を踏まえると、なぜ fork が Python で最初から利用可能だったのか疑問に思うかもしれません。

まず、おそらく歴史的な理由からです:spawn は Python バージョン 3.4 で追加されましたが、fork は Python 2.x シリーズから一部でした。

2つ目に、spawn および forkserver には適用されないいくつかの制限が fork には存在します。特に、すべての引数がピック可能である必要があります。 詳細については、Python の並列処理のドキュメント を参照してください。

3つ目に、spawn は実質的に fork に加えて新しい Python プロセスをロックなしで作成する execv の呼び出しのため、forkspawn よりも早く新しいプロセスを作成します。 そのため、Python ドキュメントにはより遅いとの警告があります:spawn よりもオーバーヘッドが多いです。 しかし、ほとんどの場合、複数のプロセスを使用する目的は、数分または数時間かかる計算を速めることであり、このオーバーヘッドは全体的な状況では無視できるほどです。 そしてより重要なことに、それはマルチスレッドライブラリと組み合わせて実際に機能します。

4つ目に、spawn は新しいプロセスを開始するため、fork と違ってコードがインポート可能である必要があります。 特に spawn を使用する場合、関連するコードは例えば Jupyter ノートブックやプレーンなスクリプトなどのグローバルなスコープにあるべきではありません。 したがって、上記の例では、main 節から実行する関数内でスポーンするように関数を定義しています。 これは典型的なプロジェクトでは問題ではありませんが、ノートブックでの迅速な実験においては失敗する可能性があります。

参考文献

  1. https://docs.python.org/3/library/multiprocessing.html

  2. https://pythonspeed.com/articles/python-multiprocessing/

  3. https://pubs.opengroup.org/onlinepubs/9699919799/functions/fork.html

  4. https://bnikolic.co.uk/blog/python/parallelism/2019/11/13/python-forkserver-preload.html