ストリーミングインターフェース
lazy APIの追加の利点の1つは、クエリをストリーミング方式で実行できることです。Polarsは一度に全てのデータを処理するのではなく、バッチ処理することで、メモリ以上のデータセットを処理できます。
Polarsにクエリをストリーミングモードで実行させるには、collect
メソッドにstreaming=True
引数を渡します。
q1 = (
pl.scan_csv("docs/data/iris.csv")
.filter(pl.col("sepal_length") > 5)
.group_by("species")
.agg(pl.col("sepal_width").mean())
)
df = q1.collect(streaming=True)
collect
· Available on feature streaming
let q1 = LazyCsvReader::new("docs/data/iris.csv")
.has_header(true)
.finish()?
.filter(col("sepal_length").gt(lit(5)))
.group_by(vec![col("species")])
.agg([col("sepal_width").mean()]);
let df = q1.clone().with_streaming(true).collect()?;
println!("{}", df);
いつストリーミングが利用可能か
ストリーミングは現在開発中です。Polarsに任意のlazyクエリをストリーミングモードで実行させることができます。ただし、すべてのlazyオペレーションがストリーミングをサポートしているわけではありません。ストリーミングがサポートされていないオペレーションがある場合、Polarsはノンストリーミングモードでクエリを実行します。
ストリーミングは以下のようなオペレーションでサポートされています:
filter
,slice
,head
,tail
with_columns
,select
group_by
join
unique
sort
explode
,melt
scan_csv
,scan_parquet
,scan_ipc
この一覧は完全なものではありません。Polarsは活発に開発が進められており、明示的な通知なしに、より多くのオペレーションがストリーミングに対応される可能性があります。
ストリーミングをサポートするオペレーションの例
クエリのどの部分がストリーミングされるかを確認するには、explain
メソッドを使用します。以下の例では、クエリプランの検査方法を示しています。クエリプランの詳細については、Lazy APIの章を参照してください。
--- STREAMING
AGGREGATE
[col("sepal_width").mean()] BY [col("species")] FROM
Csv SCAN docs/data/iris.csv
PROJECT 3/5 COLUMNS
SELECTION: [(col("sepal_length")) > (5.0)] --- END STREAMING
DF []; PROJECT */0 COLUMNS; SELECTION: "None"
ストリーミングをサポートしないオペレーションの例
q2 = pl.scan_csv("docs/data/iris.csv").with_columns(
pl.col("sepal_length").mean().over("species")
)
print(q2.explain(streaming=True))
let q2 = LazyCsvReader::new("docs/data/iris.csv")
.finish()?
.with_columns(vec![col("sepal_length")
.mean()
.over(vec![col("species")])
.alias("sepal_length_mean")]);
let query_plan = q2.with_streaming(true).explain(true)?;
println!("{}", query_plan);
WITH_COLUMNS:
[col("sepal_length").mean().over([col("species")])]
--- STREAMING
Csv SCAN docs/data/iris.csv
PROJECT */5 COLUMNS --- END STREAMING
DF []; PROJECT */0 COLUMNS; SELECTION: "None"