Skip to content

ストリーミングインターフェース

lazy APIの追加の利点の1つは、クエリをストリーミング方式で実行できることです。Polarsは一度に全てのデータを処理するのではなく、バッチ処理することで、メモリ以上のデータセットを処理できます。

Polarsにクエリをストリーミングモードで実行させるには、collectメソッドにstreaming=True引数を渡します。

collect

q1 = (
    pl.scan_csv("docs/data/iris.csv")
    .filter(pl.col("sepal_length") > 5)
    .group_by("species")
    .agg(pl.col("sepal_width").mean())
)
df = q1.collect(streaming=True)

collect · Available on feature streaming

let q1 = LazyCsvReader::new("docs/data/iris.csv")
    .has_header(true)
    .finish()?
    .filter(col("sepal_length").gt(lit(5)))
    .group_by(vec![col("species")])
    .agg([col("sepal_width").mean()]);

let df = q1.clone().with_streaming(true).collect()?;
println!("{}", df);

いつストリーミングが利用可能か

ストリーミングは現在開発中です。Polarsに任意のlazyクエリをストリーミングモードで実行させることができます。ただし、すべてのlazyオペレーションがストリーミングをサポートしているわけではありません。ストリーミングがサポートされていないオペレーションがある場合、Polarsはノンストリーミングモードでクエリを実行します。

ストリーミングは以下のようなオペレーションでサポートされています:

  • filter,slice,head,tail
  • with_columns,select
  • group_by
  • join
  • unique
  • sort
  • explode,melt
  • scan_csv,scan_parquet,scan_ipc

この一覧は完全なものではありません。Polarsは活発に開発が進められており、明示的な通知なしに、より多くのオペレーションがストリーミングに対応される可能性があります。

ストリーミングをサポートするオペレーションの例

クエリのどの部分がストリーミングされるかを確認するには、explainメソッドを使用します。以下の例では、クエリプランの検査方法を示しています。クエリプランの詳細については、Lazy APIの章を参照してください。

explain

print(q1.explain(streaming=True))

explain

let query_plan = q1.with_streaming(true).explain(true)?;
println!("{}", query_plan);

--- STREAMING
AGGREGATE
    [col("sepal_width").mean()] BY [col("species")] FROM

    Csv SCAN docs/data/iris.csv
    PROJECT 3/5 COLUMNS
    SELECTION: [(col("sepal_length")) > (5.0)]  --- END STREAMING

  DF []; PROJECT */0 COLUMNS; SELECTION: "None"

ストリーミングをサポートしないオペレーションの例

explain

q2 = pl.scan_csv("docs/data/iris.csv").with_columns(
    pl.col("sepal_length").mean().over("species")
)

print(q2.explain(streaming=True))

explain

let q2 = LazyCsvReader::new("docs/data/iris.csv")
    .finish()?
    .with_columns(vec![col("sepal_length")
        .mean()
        .over(vec![col("species")])
        .alias("sepal_length_mean")]);

let query_plan = q2.with_streaming(true).explain(true)?;
println!("{}", query_plan);

 WITH_COLUMNS:
 [col("sepal_length").mean().over([col("species")])]
  --- STREAMING

  Csv SCAN docs/data/iris.csv
  PROJECT */5 COLUMNS  --- END STREAMING

    DF []; PROJECT */0 COLUMNS; SELECTION: "None"