この記事について
Spark DataFrame が高速な理由は Catalyst Optimizer にある。この記事では DataFrame API の使い方と、Catalyst がクエリをどう最適化するかを整理する。
1. DataFrame API
1-1. 基本操作
# 読み込み
df = spark.read.format("parquet").load("s3://data/sales/")
# 射影(カラム選択)
df.select("order_id", "amount", "region")
# フィルタ
df.filter(df.amount > 100)
df.where("amount > 100") # SQL風の書き方も可能
# 集約
df.groupBy("region").agg(
count("order_id").alias("order_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount")
)
# 結合
orders.join(products, orders.product_id == products.product_id, "inner")
# ソート
df.orderBy(df.amount.desc())
# 書き込み
df.write.format("delta").mode("overwrite").save("s3://data/output/")1-2. Spark SQL との互換性
DataFrame と SQL は内部的に同じ実行計画に変換される。
# DataFrame API
df.filter(df.amount > 100).groupBy("region").agg(sum("amount"))
# Spark SQL(同じ結果、同じ実行計画)
spark.sql("""
SELECT region, SUM(amount)
FROM sales
WHERE amount > 100
GROUP BY region
""")どちらを使っても Catalyst が同じ最適化を適用するため、性能差はない。
2. Catalyst Optimizer
2-1. 最適化パイプライン
ユーザーコード(DataFrame API / SQL)
│
▼
┌──────────────────┐
│ 1. 解析(Analysis)│ ← テーブル・カラムの存在確認、型解決
└────────┬─────────┘
▼
┌──────────────────┐
│ 2. 論理最適化 │ ← ルールベースの書き換え
│ (Logical Plan) │
└────────┬─────────┘
▼
┌──────────────────┐
│ 3. 物理計画 │ ← 実行戦略の選択(Join方式など)
│ (Physical Plan) │
└────────┬─────────┘
▼
┌──────────────────┐
│ 4. コード生成 │ ← Whole-Stage Code Generation
│ (Code Gen) │
└────────┬─────────┘
▼
実行(RDD レベル)
2-2. 論理最適化の主なルール
| 最適化 | 説明 | 例 |
|---|---|---|
| Predicate Pushdown | フィルタをデータソースに近い位置に移動 | JOIN の後の WHERE を JOIN の前に移動 |
| Projection Pushdown | 必要なカラムだけ読み取り | SELECT で使うカラムだけ Parquet から読む |
| Constant Folding | 定数式を事前計算 | WHERE year = 2020 + 4 → WHERE year = 2024 |
| Predicate Simplification | 冗長な条件を簡略化 | WHERE true AND x > 5 → WHERE x > 5 |
| Column Pruning | 不要なカラムを早期に除外 | 下流で使わないカラムを読み込まない |
| Combine Filters | 連続するフィルタを結合 | .filter(A).filter(B) → .filter(A AND B) |
2-3. Predicate Pushdown の効果
最適化前:
Parquet全カラム読み込み → JOIN → WHERE region = 'APAC'
→ 全データを読んでからフィルタ(遅い)
最適化後(Predicate Pushdown):
Parquet から region = 'APAC' のデータだけ読み込み → JOIN
→ 読み取りデータ量が大幅に削減(速い)
Parquet の Footer にある min/max 統計情報と組み合わせると、該当しない Row Group 全体をスキップできる。
2-4. 物理計画:Join 戦略の選択
Catalyst は統計情報に基づいて最適な Join 方式を選択する。
| Join 方式 | 条件 | 特徴 |
|---|---|---|
| Broadcast Hash Join | 片方が小さい(< 10MB デフォルト) | 小テーブルを全 Executor にブロードキャスト。シャッフルなし。最速 |
| Sort Merge Join | 両方が大きい | 両方を Join キーでソート・シャッフルしてマージ。汎用的 |
| Shuffle Hash Join | 片方がやや小さい | ハッシュテーブルを構築。ソート不要だがメモリを使う |
Broadcast Hash Join(シャッフルなし):
大テーブル [Executor 1] ← 小テーブル(ブロードキャスト)
大テーブル [Executor 2] ← 小テーブル(ブロードキャスト)
大テーブル [Executor 3] ← 小テーブル(ブロードキャスト)
→ 各 Executor がローカルで Join。ネットワーク転送は小テーブル分だけ
Sort Merge Join(シャッフルあり):
テーブルA ──シャッフル──→ [Executor 1: key=1〜100] ← テーブルB(同じキー範囲)
テーブルA ──シャッフル──→ [Executor 2: key=101〜200] ← テーブルB(同じキー範囲)
→ 両テーブルを key で再分配してからマージ
3. Adaptive Query Execution(AQE)
Spark 3.0 で導入された実行時最適化。実行中の統計情報に基づいて計画を動的に変更する。
| 機能 | 説明 |
|---|---|
| Coalesce Shuffle Partitions | シャッフル後の小さなパーティションを自動結合 |
| Switch Join Strategy | 実行時の実際のデータサイズで Join 方式を切り替え |
| Skew Join Optimization | データ偏りのあるパーティションを自動分割 |
データスキュー問題と AQE
スキューなし(均等):
Partition 1: 1000行 → Task 1: 1秒
Partition 2: 1000行 → Task 2: 1秒
Partition 3: 1000行 → Task 3: 1秒
合計: 1秒(並列実行)
スキューあり(偏り):
Partition 1: 100行 → Task 1: 0.1秒
Partition 2: 100行 → Task 2: 0.1秒
Partition 3: 99800行 → Task 3: 100秒 ← ボトルネック
合計: 100秒(Task 3 待ち)
AQE のスキュー対策:
Partition 3 を自動的に分割
→ Partition 3a: 33000行, 3b: 33000行, 3c: 33800行
→ 並列処理で約33秒に短縮
4. 実行計画の確認方法
# 論理計画 + 物理計画を表示
df.explain(True)
# Spark 3.0+: フォーマット指定
df.explain("formatted")== Physical Plan ==
*(2) HashAggregate(keys=[region], functions=[sum(amount)])
+- Exchange hashpartitioning(region, 200) ← シャッフル
+- *(1) HashAggregate(keys=[region], functions=[partial_sum(amount)])
+- *(1) Filter (amount > 100) ← Predicate Pushdown
+- *(1) ColumnarToRow
+- FileScan parquet [region,amount] ← 必要カラムだけ読み取り
PushedFilters: [GreaterThan(amount,100)]
読み方のポイント: – Exchange = シャッフル(ここが Stage の境界) – PushedFilters = データソースにプッシュダウンされたフィルタ – FileScan parquet [region,amount] = 2カラムだけ読み取り(Column Pruning) – *(N) = Whole-Stage Code Generation の範囲
5. Whole-Stage Code Generation
Catalyst の最終段階で、複数の演算子を1つの Java メソッドにコンパイルする。
従来(Volcano Model):
各演算子が1行ずつ処理 → 関数呼び出しのオーバーヘッドが大きい
Filter.next() → Project.next() → Scan.next() → ...
Whole-Stage Code Gen:
複数の演算子を1つのループにまとめてコンパイル
→ 仮想関数呼び出しなし、CPU パイプラインに乗りやすい
Databricks の Photon エンジンはこれをさらに進め、C++ でネイティブ実行する。
6. まとめ
| 概念 | 要点 |
|---|---|
| DataFrame API | SQL 的な操作。Python でも Scala でも同じ性能 |
| Catalyst | 4段階の最適化パイプライン(解析→論理→物理→コード生成) |
| Predicate/Projection Pushdown | 読み取りデータ量を最小化する最も効果的な最適化 |
| Join 戦略 | Broadcast(小×大)> Sort Merge(大×大)。Catalyst が自動選択 |
| AQE | 実行時にパーティション結合、Join 切替、スキュー対策を動的に実行 |
| explain() | 実行計画を確認してボトルネックを特定する基本ツール |
次回は「パーティションとシャッフルの最適化」を解説する。
参考文献
- Jules S. Damji et al. “Learning Spark.” 2nd Edition, O’Reilly Media, 2020. Chapter 3, 7.
- Michael Armbrust et al. “Spark SQL: Relational Data Processing in Spark.” SIGMOD, 2015.
- Databricks. “Adaptive Query Execution.” https://docs.databricks.com/en/optimizations/aqe.html