パーティションとシャッフルの最適化:Spark チューニングの核心

この記事について

Spark の性能問題の大半はパーティションとシャッフルに起因する。この記事では、パーティションの設計とシャッフルの最適化手法を整理する。

1. パーティションの基本

1-1. パーティションとは

DataFrame のデータを複数のチャンクに分割したもの。1パーティション = 1タスク = 1コアで処理。

DataFrame(100万行)
  ├── Partition 0: 25万行 → Task 0 → Executor 1, Core 1
  ├── Partition 1: 25万行 → Task 1 → Executor 1, Core 2
  ├── Partition 2: 25万行 → Task 2 → Executor 2, Core 1
  └── Partition 3: 25万行 → Task 3 → Executor 2, Core 2

1-2. パーティション数の影響

パーティション数 問題
少なすぎる 並列度が低い。一部のタスクに負荷集中。メモリ不足のリスク
多すぎる タスクのスケジューリングオーバーヘッド。小さなファイルが大量に生成
適切 コア数の2〜4倍。各パーティション 128MB〜1GB 程度

1-3. パーティション数の確認と変更

# 現在のパーティション数
df.rdd.getNumPartitions()

# パーティション数を増やす(シャッフルあり)
df.repartition(200)

# パーティション数を減らす(シャッフルなし、既存パーティションを結合)
df.coalesce(50)

# キーでパーティション分割(シャッフルあり)
df.repartition("region")
df.repartition(100, "region")

1-4. repartition vs coalesce

repartition coalesce
シャッフル あり なし
パーティション数 増減どちらも可 減少のみ
データ分布 均等に再分配 既存パーティションを結合(偏りが残る可能性)
用途 データスキュー解消、キー分割 書き込み前にファイル数を減らす
# 典型的なパターン:処理後に書き込み前でファイル数を調整
df.filter(...) \
  .groupBy(...).agg(...) \
  .coalesce(10) \
  .write.parquet("s3://output/")
# → 10個の Parquet ファイルが生成される

2. シャッフルの最適化

2-1. シャッフルパーティション数

# デフォルト: 200(多くの場合、調整が必要)
spark.conf.set("spark.sql.shuffle.partitions", 200)
データ量 推奨パーティション数 根拠
< 1GB 10〜50 小さなタスクのオーバーヘッドを避ける
1〜10GB 50〜200 デフォルト付近
10〜100GB 200〜1000 各パーティション 128MB 程度を目安
> 100GB 1000〜10000 コア数の2〜4倍を目安

AQE が有効なら spark.sql.adaptive.coalescePartitions.enabled = true で自動調整される。

2-2. シャッフルを減らすテクニック

Broadcast Join

小さなテーブルとの Join はブロードキャストでシャッフルを回避。

from pyspark.sql.functions import broadcast

# 明示的にブロードキャスト指定
result = large_df.join(broadcast(small_df), "key")

# 閾値の調整(デフォルト 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024)  # 50MB

Bucket Join

事前にデータを同じキーでバケット分割しておくと、Join 時のシャッフルが不要になる。

# 書き込み時にバケット分割
df.write.bucketBy(100, "product_id").sortBy("product_id") \
  .saveAsTable("bucketed_sales")

# バケット同士の Join → シャッフルなし
sales = spark.table("bucketed_sales")
products = spark.table("bucketed_products")  # 同じキー・同じバケット数
result = sales.join(products, "product_id")

事前集約(Partial Aggregation)

# 悪い例:全データをシャッフルしてから集約
df.groupBy("region").agg(sum("amount"))

# Spark は自動的に partial aggregation を行う:
# 1. 各パーティション内で部分集約(Map側)
# 2. 部分結果だけをシャッフル(データ量が大幅に減る)
# 3. シャッフル後に最終集約(Reduce側)
# → ユーザーが意識しなくても Catalyst が最適化

3. データスキュー対策

3-1. スキューの検出

# パーティションごとのサイズを確認
df.groupBy(spark_partition_id()).count().show()

# Spark UI の Stage 詳細で Task の実行時間のばらつきを確認
# → 1つだけ極端に遅い Task があればスキュー

3-2. 対策手法

手法 説明
AQE Skew Join Spark 3.0+。自動的にスキューパーティションを分割(推奨)
Salt Key Join キーにランダム値を付加して分散させる
2段階集約 まず Salt 付きで部分集約、次に Salt を除いて最終集約
Broadcast スキューの原因テーブルが小さければブロードキャスト

Salt Key の例

from pyspark.sql.functions import lit, rand, concat, col

# 問題:region = "東京" のデータが全体の80%を占める
# → groupBy("region") で東京のパーティションだけ巨大になる

# Salt Key で分散
salt_num = 10
salted = df.withColumn("salt", (rand() * salt_num).cast("int"))

# 1段階目:salt 付きで集約
partial = salted.groupBy("region", "salt").agg(sum("amount").alias("partial_sum"))

# 2段階目:salt を除いて最終集約
result = partial.groupBy("region").agg(sum("partial_sum").alias("total"))

4. キャッシュとパーシスト

4-1. いつキャッシュすべきか

同じ DataFrame を複数回使う場合にキャッシュが有効。

# キャッシュなし:df を2回計算する
result1 = df.filter(...).groupBy(...).agg(...)
result2 = df.filter(...).groupBy(...).agg(...)

# キャッシュあり:df を1回だけ計算してメモリに保持
df.cache()  # または df.persist()
result1 = df.filter(...).groupBy(...).agg(...)
result2 = df.filter(...).groupBy(...).agg(...)
df.unpersist()  # 不要になったら解放

4-2. ストレージレベル

from pyspark import StorageLevel

df.persist(StorageLevel.MEMORY_ONLY)        # メモリのみ(デフォルト)
df.persist(StorageLevel.MEMORY_AND_DISK)    # メモリに入りきらなければディスク
df.persist(StorageLevel.DISK_ONLY)          # ディスクのみ
df.persist(StorageLevel.MEMORY_ONLY_SER)    # シリアライズしてメモリ(省メモリ)

4-3. キャッシュの注意点

  • 1回しか使わない DataFrame はキャッシュしない(メモリの無駄)
  • キャッシュはメモリを消費する → Execution メモリが減る → シャッフルが遅くなる可能性
  • 不要になったら unpersist() で明示的に解放

5. 書き込みの最適化

5-1. ファイル数の制御

# パーティション数 = 出力ファイル数
df.repartition(10).write.parquet("s3://output/")  # 10ファイル
df.coalesce(1).write.parquet("s3://output/")       # 1ファイル(小データ向け)

# パーティションカラム指定時
df.repartition("date").write.partitionBy("date").parquet("s3://output/")
# → 日付ごとにディレクトリ分割、各ディレクトリ内のファイル数はパーティション数に依存

5-2. 小さなファイル問題の回避

問題:
  ストリーミングや頻繁な書き込みで小さなファイルが大量に生成
  → 読み取り時のオーバーヘッド増大

対策:
  1. coalesce / repartition で書き込み前にファイル数を調整
  2. Delta Lake の OPTIMIZE コマンドで事後的にファイル結合
  3. Auto Compaction(Databricks)で自動的に結合

6. Spark UI の読み方

6-1. 確認すべきポイント

タブ 確認内容
Jobs 各 Job の実行時間、失敗の有無
Stages Stage ごとの Task 数、シャッフル読み書き量
Tasks 個々の Task の実行時間(スキューの検出)
Storage キャッシュされた DataFrame のサイズ
SQL 実行計画の可視化、各ノードの処理時間

6-2. よくあるボトルネックパターン

症状 原因 対策
1つの Task だけ極端に遅い データスキュー AQE / Salt Key / Broadcast
シャッフル読み書きが巨大 不要なシャッフル Broadcast Join / Bucket Join
GC 時間が長い メモリ不足 Executor メモリ増加 / パーティション数増加
Task 数が少ない パーティション不足 repartition で増やす
小さな Task が大量 パーティション過多 coalesce / AQE で結合

7. まとめ

テーマ 要点
パーティション数 コア数の2〜4倍、各パーティション 128MB〜1GB が目安
repartition vs coalesce 増やす→repartition、減らす→coalesce
シャッフル削減 Broadcast Join、Bucket Join、AQE
データスキュー AQE(推奨)、Salt Key、2段階集約
キャッシュ 複数回使う DataFrame のみ。不要になったら unpersist
書き込み coalesce でファイル数制御。Delta Lake の OPTIMIZE で事後結合

Spark チューニングの原則: 1. シャッフルを減らす 2. パーティションを適切に設計する 3. Spark UI で実行計画とタスク分布を確認する

次回は Spark の Structured Streaming を AWS Kinesis / Oracle Streaming と比較する。

参考文献

  • Jules S. Damji et al. “Learning Spark.” 2nd Edition, O’Reilly Media, 2020. Chapter 7.
  • Databricks. “Best Practices for Spark Performance.” https://docs.databricks.com/en/optimizations/
  • Holden Karau, Rachel Warren. “High Performance Spark.” O’Reilly Media, 2017.

コメントする