この記事について
Spark の性能問題の大半はパーティションとシャッフルに起因する。この記事では、パーティションの設計とシャッフルの最適化手法を整理する。
1. パーティションの基本
1-1. パーティションとは
DataFrame のデータを複数のチャンクに分割したもの。1パーティション = 1タスク = 1コアで処理。
DataFrame(100万行)
├── Partition 0: 25万行 → Task 0 → Executor 1, Core 1
├── Partition 1: 25万行 → Task 1 → Executor 1, Core 2
├── Partition 2: 25万行 → Task 2 → Executor 2, Core 1
└── Partition 3: 25万行 → Task 3 → Executor 2, Core 2
1-2. パーティション数の影響
| パーティション数 | 問題 |
|---|---|
| 少なすぎる | 並列度が低い。一部のタスクに負荷集中。メモリ不足のリスク |
| 多すぎる | タスクのスケジューリングオーバーヘッド。小さなファイルが大量に生成 |
| 適切 | コア数の2〜4倍。各パーティション 128MB〜1GB 程度 |
1-3. パーティション数の確認と変更
# 現在のパーティション数
df.rdd.getNumPartitions()
# パーティション数を増やす(シャッフルあり)
df.repartition(200)
# パーティション数を減らす(シャッフルなし、既存パーティションを結合)
df.coalesce(50)
# キーでパーティション分割(シャッフルあり)
df.repartition("region")
df.repartition(100, "region")1-4. repartition vs coalesce
| repartition | coalesce | |
|---|---|---|
| シャッフル | あり | なし |
| パーティション数 | 増減どちらも可 | 減少のみ |
| データ分布 | 均等に再分配 | 既存パーティションを結合(偏りが残る可能性) |
| 用途 | データスキュー解消、キー分割 | 書き込み前にファイル数を減らす |
# 典型的なパターン:処理後に書き込み前でファイル数を調整
df.filter(...) \
.groupBy(...).agg(...) \
.coalesce(10) \
.write.parquet("s3://output/")
# → 10個の Parquet ファイルが生成される2. シャッフルの最適化
2-1. シャッフルパーティション数
# デフォルト: 200(多くの場合、調整が必要)
spark.conf.set("spark.sql.shuffle.partitions", 200)| データ量 | 推奨パーティション数 | 根拠 |
|---|---|---|
| < 1GB | 10〜50 | 小さなタスクのオーバーヘッドを避ける |
| 1〜10GB | 50〜200 | デフォルト付近 |
| 10〜100GB | 200〜1000 | 各パーティション 128MB 程度を目安 |
| > 100GB | 1000〜10000 | コア数の2〜4倍を目安 |
AQE が有効なら spark.sql.adaptive.coalescePartitions.enabled = true で自動調整される。
2-2. シャッフルを減らすテクニック
Broadcast Join
小さなテーブルとの Join はブロードキャストでシャッフルを回避。
from pyspark.sql.functions import broadcast
# 明示的にブロードキャスト指定
result = large_df.join(broadcast(small_df), "key")
# 閾値の調整(デフォルト 10MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024) # 50MBBucket Join
事前にデータを同じキーでバケット分割しておくと、Join 時のシャッフルが不要になる。
# 書き込み時にバケット分割
df.write.bucketBy(100, "product_id").sortBy("product_id") \
.saveAsTable("bucketed_sales")
# バケット同士の Join → シャッフルなし
sales = spark.table("bucketed_sales")
products = spark.table("bucketed_products") # 同じキー・同じバケット数
result = sales.join(products, "product_id")事前集約(Partial Aggregation)
# 悪い例:全データをシャッフルしてから集約
df.groupBy("region").agg(sum("amount"))
# Spark は自動的に partial aggregation を行う:
# 1. 各パーティション内で部分集約(Map側)
# 2. 部分結果だけをシャッフル(データ量が大幅に減る)
# 3. シャッフル後に最終集約(Reduce側)
# → ユーザーが意識しなくても Catalyst が最適化3. データスキュー対策
3-1. スキューの検出
# パーティションごとのサイズを確認
df.groupBy(spark_partition_id()).count().show()
# Spark UI の Stage 詳細で Task の実行時間のばらつきを確認
# → 1つだけ極端に遅い Task があればスキュー3-2. 対策手法
| 手法 | 説明 |
|---|---|
| AQE Skew Join | Spark 3.0+。自動的にスキューパーティションを分割(推奨) |
| Salt Key | Join キーにランダム値を付加して分散させる |
| 2段階集約 | まず Salt 付きで部分集約、次に Salt を除いて最終集約 |
| Broadcast | スキューの原因テーブルが小さければブロードキャスト |
Salt Key の例
from pyspark.sql.functions import lit, rand, concat, col
# 問題:region = "東京" のデータが全体の80%を占める
# → groupBy("region") で東京のパーティションだけ巨大になる
# Salt Key で分散
salt_num = 10
salted = df.withColumn("salt", (rand() * salt_num).cast("int"))
# 1段階目:salt 付きで集約
partial = salted.groupBy("region", "salt").agg(sum("amount").alias("partial_sum"))
# 2段階目:salt を除いて最終集約
result = partial.groupBy("region").agg(sum("partial_sum").alias("total"))4. キャッシュとパーシスト
4-1. いつキャッシュすべきか
同じ DataFrame を複数回使う場合にキャッシュが有効。
# キャッシュなし:df を2回計算する
result1 = df.filter(...).groupBy(...).agg(...)
result2 = df.filter(...).groupBy(...).agg(...)
# キャッシュあり:df を1回だけ計算してメモリに保持
df.cache() # または df.persist()
result1 = df.filter(...).groupBy(...).agg(...)
result2 = df.filter(...).groupBy(...).agg(...)
df.unpersist() # 不要になったら解放4-2. ストレージレベル
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_ONLY) # メモリのみ(デフォルト)
df.persist(StorageLevel.MEMORY_AND_DISK) # メモリに入りきらなければディスク
df.persist(StorageLevel.DISK_ONLY) # ディスクのみ
df.persist(StorageLevel.MEMORY_ONLY_SER) # シリアライズしてメモリ(省メモリ)4-3. キャッシュの注意点
- 1回しか使わない DataFrame はキャッシュしない(メモリの無駄)
- キャッシュはメモリを消費する → Execution メモリが減る → シャッフルが遅くなる可能性
- 不要になったら
unpersist()で明示的に解放
5. 書き込みの最適化
5-1. ファイル数の制御
# パーティション数 = 出力ファイル数
df.repartition(10).write.parquet("s3://output/") # 10ファイル
df.coalesce(1).write.parquet("s3://output/") # 1ファイル(小データ向け)
# パーティションカラム指定時
df.repartition("date").write.partitionBy("date").parquet("s3://output/")
# → 日付ごとにディレクトリ分割、各ディレクトリ内のファイル数はパーティション数に依存5-2. 小さなファイル問題の回避
問題:
ストリーミングや頻繁な書き込みで小さなファイルが大量に生成
→ 読み取り時のオーバーヘッド増大
対策:
1. coalesce / repartition で書き込み前にファイル数を調整
2. Delta Lake の OPTIMIZE コマンドで事後的にファイル結合
3. Auto Compaction(Databricks)で自動的に結合
6. Spark UI の読み方
6-1. 確認すべきポイント
| タブ | 確認内容 |
|---|---|
| Jobs | 各 Job の実行時間、失敗の有無 |
| Stages | Stage ごとの Task 数、シャッフル読み書き量 |
| Tasks | 個々の Task の実行時間(スキューの検出) |
| Storage | キャッシュされた DataFrame のサイズ |
| SQL | 実行計画の可視化、各ノードの処理時間 |
6-2. よくあるボトルネックパターン
| 症状 | 原因 | 対策 |
|---|---|---|
| 1つの Task だけ極端に遅い | データスキュー | AQE / Salt Key / Broadcast |
| シャッフル読み書きが巨大 | 不要なシャッフル | Broadcast Join / Bucket Join |
| GC 時間が長い | メモリ不足 | Executor メモリ増加 / パーティション数増加 |
| Task 数が少ない | パーティション不足 | repartition で増やす |
| 小さな Task が大量 | パーティション過多 | coalesce / AQE で結合 |
7. まとめ
| テーマ | 要点 |
|---|---|
| パーティション数 | コア数の2〜4倍、各パーティション 128MB〜1GB が目安 |
| repartition vs coalesce | 増やす→repartition、減らす→coalesce |
| シャッフル削減 | Broadcast Join、Bucket Join、AQE |
| データスキュー | AQE(推奨)、Salt Key、2段階集約 |
| キャッシュ | 複数回使う DataFrame のみ。不要になったら unpersist |
| 書き込み | coalesce でファイル数制御。Delta Lake の OPTIMIZE で事後結合 |
Spark チューニングの原則: 1. シャッフルを減らす 2. パーティションを適切に設計する 3. Spark UI で実行計画とタスク分布を確認する
次回は Spark の Structured Streaming を AWS Kinesis / Oracle Streaming と比較する。
参考文献
- Jules S. Damji et al. “Learning Spark.” 2nd Edition, O’Reilly Media, 2020. Chapter 7.
- Databricks. “Best Practices for Spark Performance.” https://docs.databricks.com/en/optimizations/
- Holden Karau, Rachel Warren. “High Performance Spark.” O’Reilly Media, 2017.