Skip to content

クラウドストレージ

Polars は AWS S3、Azure Blob Storage、Google Cloud Storage の読み書きが可能です。 これらの3つのストレージプロバイダに対するAPIは同じです。

クラウドストレージから読み取る場合、ユースケースやクラウドストレージプロバイダによっては、追加の依存関係が必要になる可能性があります。

$ pip install fsspec s3fs adlfs gcsfs
$ cargo add aws_sdk_s3 aws_config tokio --features tokio/full

クラウドストレージからの読み込み

Polars は、eager モードで CSV、IPC、または Parquet ファイルをクラウドストレージから読み込むことができます。

read_parquet · read_csv · read_ipc

import polars as pl

source = "s3://bucket/*.parquet"

df = pl.read_parquet(source)

ParquetReader · CsvReader · IpcReader · Available on feature parquet · Available on feature ipc · Available on feature csv

use aws_config::BehaviorVersion;
use polars::prelude::*;

#[tokio::main]
async fn main() {
    let bucket = "<YOUR_BUCKET>";
    let path = "<YOUR_PATH>";

    let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
    let client = aws_sdk_s3::Client::new(&config);

    let object = client
        .get_object()
        .bucket(bucket)
        .key(path)
        .send()
        .await
        .unwrap();

    let bytes = object.body.collect().await.unwrap().into_bytes();

    let cursor = std::io::Cursor::new(bytes);
    let df = CsvReader::new(cursor).finish().unwrap();

    println!("{:?}", df);
}

この eager クエリは、ファイルをメモリ内のバッファーにダウンロードし、そこから DataFrame を作成します。Polars は内部で fsspec を使ってこのダウンロードを管理しており、すべてのクラウドストレージプロバイダーに対応しています。

クラウドストレージからのクエリ最適化付きスキャン

Polars は、クラウドストレージから Parquet ファイルをレイジーモードでスキャンできます。ソースURLの他に、認証情報やストレージリージョンなどの詳細を提供する必要があるかもしれません。 Polarsは環境変数でこれらを検索しますが、 storage_options 引数として dict を渡すこともできます。

scan_parquet

import polars as pl

source = "s3://bucket/*.parquet"

storage_options = {
    "aws_access_key_id": "<secret>",
    "aws_secret_access_key": "<secret>",
    "aws_region": "us-east-1",
}
df = pl.scan_parquet(source, storage_options=storage_options)

このクエリは、ファイルをダウンロードせずに LazyFrame を作成します。 LazyFrame では、スキーマなどのファイルメタデータにアクセスできます。 Polars は内部で object_store.rs ライブラリを使ってクラウドストレージプロバイダとのインターフェースを管理しているため、クラウド Parquet ファイルをスキャンするためにPythonで追加の依存関係は必要ありません。

述語と射影のプッシュダウンを使ってレイジークエリを作成すると、ファイルがダウンロードされる前にクエリオプティマイザーが適用します。これにより、ダウンロードする必要のあるデータ量を大幅に削減できます。クエリの評価は collect を呼び出すことで開始されます。

import polars as pl

source = "s3://bucket/*.parquet"


df = pl.scan_parquet(source).filter(pl.col("id") < 100).select("id","value").collect()

Pythonアロー(PyArrow)によるスキャン

PyArrowを使ってクラウドストレージからスキャンすることもできます。これは、Hiveパーティショニングなどのパーティション化されたデータセットに特に便利です。

まず、PyArrowデータセットを作成し、その後にデータセットからLazyFrameを作成します。

scan_pyarrow_dataset

import polars as pl
import pyarrow.dataset as ds

dset = ds.dataset("s3://my-partitioned-folder/", format="parquet")
(
    pl.scan_pyarrow_dataset(dset)
    .filter(pl.col("foo") == "a")
    .select(["foo", "bar"])
    .collect()
)

クラウドストレージへの書き込み

Python で s3fs (S3 用)、adlfs (Azure Blob Storage 用)、gcsfs (Google Cloud Storage 用) を使って、DataFrame をクラウドストレージに書き込むことができます。この例では、Parquet ファイルを S3 に書き込みます。

write_parquet

import polars as pl
import s3fs

df = pl.DataFrame({
    "foo": ["a", "b", "c", "d", "d"],
    "bar": [1, 2, 3, 4, 5],
})

fs = s3fs.S3FileSystem()
destination = "s3://bucket/my_file.parquet"

# write parquet
with fs.open(destination, mode='wb') as f:
    df.write_parquet(f)