DataFrame API と Catalyst Optimizer:Spark の最適化エンジンを理解する

この記事について

Spark DataFrame が高速な理由は Catalyst Optimizer にある。この記事では DataFrame API の使い方と、Catalyst がクエリをどう最適化するかを整理する。

1. DataFrame API

1-1. 基本操作

# 読み込み
df = spark.read.format("parquet").load("s3://data/sales/")

# 射影(カラム選択)
df.select("order_id", "amount", "region")

# フィルタ
df.filter(df.amount > 100)
df.where("amount > 100")  # SQL風の書き方も可能

# 集約
df.groupBy("region").agg(
    count("order_id").alias("order_count"),
    sum("amount").alias("total_amount"),
    avg("amount").alias("avg_amount")
)

# 結合
orders.join(products, orders.product_id == products.product_id, "inner")

# ソート
df.orderBy(df.amount.desc())

# 書き込み
df.write.format("delta").mode("overwrite").save("s3://data/output/")

1-2. Spark SQL との互換性

DataFrame と SQL は内部的に同じ実行計画に変換される。

# DataFrame API
df.filter(df.amount > 100).groupBy("region").agg(sum("amount"))

# Spark SQL(同じ結果、同じ実行計画)
spark.sql("""
    SELECT region, SUM(amount)
    FROM sales
    WHERE amount > 100
    GROUP BY region
""")

どちらを使っても Catalyst が同じ最適化を適用するため、性能差はない。

2. Catalyst Optimizer

2-1. 最適化パイプライン

ユーザーコード(DataFrame API / SQL)
    │
    ▼
┌──────────────────┐
│ 1. 解析(Analysis)│  ← テーブル・カラムの存在確認、型解決
└────────┬─────────┘
         ▼
┌──────────────────┐
│ 2. 論理最適化      │  ← ルールベースの書き換え
│  (Logical Plan)   │
└────────┬─────────┘
         ▼
┌──────────────────┐
│ 3. 物理計画       │  ← 実行戦略の選択(Join方式など)
│  (Physical Plan)  │
└────────┬─────────┘
         ▼
┌──────────────────┐
│ 4. コード生成      │  ← Whole-Stage Code Generation
│  (Code Gen)       │
└────────┬─────────┘
         ▼
    実行(RDD レベル)

2-2. 論理最適化の主なルール

最適化 説明
Predicate Pushdown フィルタをデータソースに近い位置に移動 JOIN の後の WHERE を JOIN の前に移動
Projection Pushdown 必要なカラムだけ読み取り SELECT で使うカラムだけ Parquet から読む
Constant Folding 定数式を事前計算 WHERE year = 2020 + 4WHERE year = 2024
Predicate Simplification 冗長な条件を簡略化 WHERE true AND x > 5WHERE x > 5
Column Pruning 不要なカラムを早期に除外 下流で使わないカラムを読み込まない
Combine Filters 連続するフィルタを結合 .filter(A).filter(B).filter(A AND B)

2-3. Predicate Pushdown の効果

最適化前:
  Parquet全カラム読み込み → JOIN → WHERE region = 'APAC'
  → 全データを読んでからフィルタ(遅い)

最適化後(Predicate Pushdown):
  Parquet から region = 'APAC' のデータだけ読み込み → JOIN
  → 読み取りデータ量が大幅に削減(速い)

Parquet の Footer にある min/max 統計情報と組み合わせると、該当しない Row Group 全体をスキップできる。

2-4. 物理計画:Join 戦略の選択

Catalyst は統計情報に基づいて最適な Join 方式を選択する。

Join 方式 条件 特徴
Broadcast Hash Join 片方が小さい(< 10MB デフォルト) 小テーブルを全 Executor にブロードキャスト。シャッフルなし。最速
Sort Merge Join 両方が大きい 両方を Join キーでソート・シャッフルしてマージ。汎用的
Shuffle Hash Join 片方がやや小さい ハッシュテーブルを構築。ソート不要だがメモリを使う
Broadcast Hash Join(シャッフルなし):
  大テーブル [Executor 1] ← 小テーブル(ブロードキャスト)
  大テーブル [Executor 2] ← 小テーブル(ブロードキャスト)
  大テーブル [Executor 3] ← 小テーブル(ブロードキャスト)
  → 各 Executor がローカルで Join。ネットワーク転送は小テーブル分だけ

Sort Merge Join(シャッフルあり):
  テーブルA ──シャッフル──→ [Executor 1: key=1〜100]  ← テーブルB(同じキー範囲)
  テーブルA ──シャッフル──→ [Executor 2: key=101〜200] ← テーブルB(同じキー範囲)
  → 両テーブルを key で再分配してからマージ

3. Adaptive Query Execution(AQE)

Spark 3.0 で導入された実行時最適化。実行中の統計情報に基づいて計画を動的に変更する。

機能 説明
Coalesce Shuffle Partitions シャッフル後の小さなパーティションを自動結合
Switch Join Strategy 実行時の実際のデータサイズで Join 方式を切り替え
Skew Join Optimization データ偏りのあるパーティションを自動分割

データスキュー問題と AQE

スキューなし(均等):
  Partition 1: 1000行  → Task 1: 1秒
  Partition 2: 1000行  → Task 2: 1秒
  Partition 3: 1000行  → Task 3: 1秒
  合計: 1秒(並列実行)

スキューあり(偏り):
  Partition 1: 100行   → Task 1: 0.1秒
  Partition 2: 100行   → Task 2: 0.1秒
  Partition 3: 99800行 → Task 3: 100秒  ← ボトルネック
  合計: 100秒(Task 3 待ち)

AQE のスキュー対策:
  Partition 3 を自動的に分割
  → Partition 3a: 33000行, 3b: 33000行, 3c: 33800行
  → 並列処理で約33秒に短縮

4. 実行計画の確認方法

# 論理計画 + 物理計画を表示
df.explain(True)

# Spark 3.0+: フォーマット指定
df.explain("formatted")
== Physical Plan ==
*(2) HashAggregate(keys=[region], functions=[sum(amount)])
+- Exchange hashpartitioning(region, 200)    ← シャッフル
   +- *(1) HashAggregate(keys=[region], functions=[partial_sum(amount)])
      +- *(1) Filter (amount > 100)          ← Predicate Pushdown
         +- *(1) ColumnarToRow
            +- FileScan parquet [region,amount]  ← 必要カラムだけ読み取り
               PushedFilters: [GreaterThan(amount,100)]

読み方のポイント: – Exchange = シャッフル(ここが Stage の境界) – PushedFilters = データソースにプッシュダウンされたフィルタ – FileScan parquet [region,amount] = 2カラムだけ読み取り(Column Pruning) – *(N) = Whole-Stage Code Generation の範囲

5. Whole-Stage Code Generation

Catalyst の最終段階で、複数の演算子を1つの Java メソッドにコンパイルする。

従来(Volcano Model):
  各演算子が1行ずつ処理 → 関数呼び出しのオーバーヘッドが大きい
  Filter.next() → Project.next() → Scan.next() → ...

Whole-Stage Code Gen:
  複数の演算子を1つのループにまとめてコンパイル
  → 仮想関数呼び出しなし、CPU パイプラインに乗りやすい

Databricks の Photon エンジンはこれをさらに進め、C++ でネイティブ実行する。

6. まとめ

概念 要点
DataFrame API SQL 的な操作。Python でも Scala でも同じ性能
Catalyst 4段階の最適化パイプライン(解析→論理→物理→コード生成)
Predicate/Projection Pushdown 読み取りデータ量を最小化する最も効果的な最適化
Join 戦略 Broadcast(小×大)> Sort Merge(大×大)。Catalyst が自動選択
AQE 実行時にパーティション結合、Join 切替、スキュー対策を動的に実行
explain() 実行計画を確認してボトルネックを特定する基本ツール

次回は「パーティションとシャッフルの最適化」を解説する。

参考文献

  • Jules S. Damji et al. “Learning Spark.” 2nd Edition, O’Reilly Media, 2020. Chapter 3, 7.
  • Michael Armbrust et al. “Spark SQL: Relational Data Processing in Spark.” SIGMOD, 2015.
  • Databricks. “Adaptive Query Execution.” https://docs.databricks.com/en/optimizations/aqe.html

コメントする