この記事について
Apache Spark は大規模データの分散処理エンジンであり、Databricks の基盤技術である。この記事では Spark のアーキテクチャと、ジョブがどのように分解・実行されるかを整理する。
1. Spark の位置づけ
1-1. Hadoop MapReduce との違い
Spark は Hadoop MapReduce の後継として 2014 年に登場した。
| 観点 | MapReduce | Spark |
|---|---|---|
| 中間データ | ディスクに書き出し | メモリに保持 |
| 処理モデル | Map → Reduce の2段階のみ | 任意の DAG(多段パイプライン) |
| 速度 | 遅い(毎回ディスクI/O) | 速い(メモリ内処理) |
| API | Java 中心 | Python, Scala, Java, SQL, R |
| ストリーミング | 別フレームワーク(Storm等) | Structured Streaming で統合 |
1-2. Spark のエコシステム
┌─────────────────────────────────────────┐
│ Spark アプリケーション │
│ │
│ ┌─────────┐ ┌──────┐ ┌─────────────┐ │
│ │Spark SQL│ │MLlib │ │Structured │ │
│ │DataFrame│ │(ML) │ │Streaming │ │
│ └────┬────┘ └──┬───┘ └──────┬──────┘ │
│ └─────────┼────────────┘ │
│ ┌────┴────┐ │
│ │Spark Core│ │
│ │ (RDD) │ │
│ └────┬────┘ │
└─────────────────┼───────────────────────┘
│
┌─────────────┼─────────────┐
▼ ▼ ▼
YARN Kubernetes Standalone
(Cluster Manager)
│ │ │
▼ ▼ ▼
HDFS / S3 / ADLS / GCS(ストレージ)
2. アーキテクチャ
2-1. 基本構成
┌──────────────────────────────────────────────────┐
│ Spark Application │
│ │
│ ┌──────────────┐ │
│ │ Driver │ ← アプリケーションの司令塔 │
│ │ │ │
│ │ SparkContext │ ← Spark との接続点 │
│ │ DAGScheduler│ ← Job → Stage に分解 │
│ │ TaskScheduler│ ← Task を Executor に配分 │
│ └──────┬───────┘ │
│ │ タスク配分 │
│ ┌────┼────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │Executor│ │Executor│ │Executor│ │
│ │ │ │ │ │ │ │
│ │ Task │ │ Task │ │ Task │ │
│ │ Task │ │ Task │ │ Task │ │
│ │ Cache │ │ Cache │ │ Cache │ │
│ └────────┘ └────────┘ └────────┘ │
│ │
└──────────────────────────────────────────────────┘
│
▼
Cluster Manager(YARN / K8s / Standalone / Databricks)
2-2. 各コンポーネントの役割
| コンポーネント | 役割 |
|---|---|
| Driver | アプリケーションの main() を実行。SparkContext を生成し、ジョブ全体を管理 |
| SparkContext | Spark クラスタとの接続。リソース要求、ジョブ投入の窓口 |
| Executor | Worker ノード上で動くプロセス。タスクを実行し、データをキャッシュ |
| Task | Executor 上で実行される最小の処理単位。1パーティション = 1タスク |
| Cluster Manager | リソース(CPU、メモリ)の割り当てを管理 |
2-3. Databricks での違い
Databricks では Cluster Manager が Databricks 独自の管理層に置き換わる。
| 標準 Spark | Databricks |
|---|---|
| YARN / K8s / Standalone | Databricks Cluster Manager |
| 手動でクラスタ構成 | UI / API でクラスタ作成、自動スケール |
| spark-submit でジョブ投入 | ノートブック or Workflows |
3. RDD → DataFrame → Dataset
3-1. 歴史的変遷
| 世代 | API | 登場 | 特徴 |
|---|---|---|---|
| 第1世代 | RDD | 2014 (Spark 1.0) | 低レベル、型安全、最適化なし |
| 第2世代 | DataFrame | 2015 (Spark 1.3) | SQL的、Catalyst で最適化、型安全でない |
| 第3世代 | Dataset | 2016 (Spark 1.6) | DataFrame + 型安全(Scala/Java のみ) |
3-2. RDD(Resilient Distributed Dataset)
Spark の最も基本的なデータ構造。分散された不変のデータコレクション。
# RDD の例
rdd = sc.parallelize([1, 2, 3, 4, 5])
result = rdd.filter(lambda x: x > 2).map(lambda x: x * 10).collect()
# [30, 40, 50]RDD の特徴: – 不変(Immutable):変換すると新しい RDD が生まれる – 分散:複数ノードにパーティション分割 – 耐障害性(Resilient):リネージ(変換の履歴)から再計算可能 – 最適化なし:ユーザーが書いた通りに実行される
3-3. DataFrame
行と列を持つ分散テーブル。SQL のテーブルや pandas の DataFrame に相当。
# DataFrame の例
df = spark.read.parquet("s3://data/sales/")
result = df.filter(df.amount > 100) \
.groupBy("region") \
.agg({"amount": "sum"})DataFrame の利点: – Catalyst Optimizer が自動的にクエリを最適化 – Tungsten エンジンがメモリ管理を最適化 – Python / Scala / Java / R / SQL で同じ性能(最適化後は同じ実行計画)
3-4. 現在の推奨
RDD → 低レベル制御が必要な場合のみ(ほぼ使わない)
DataFrame → Python ユーザーの標準(PySpark)
Dataset → Scala/Java で型安全が必要な場合
Spark SQL → SQL に慣れたユーザー
Databricks では DataFrame API または Spark SQL が推奨。
4. Lazy Evaluation
4-1. Transformation と Action
Spark の操作は2種類に分かれる。
| 種類 | 説明 | 実行タイミング | 例 |
|---|---|---|---|
| Transformation | 新しい RDD/DataFrame を返す | 遅延(実行されない) | filter, map, groupBy, join |
| Action | 結果を返す or 副作用を起こす | 即座に実行 | collect, count, show, write |
# これだけでは何も実行されない(Transformation の連鎖)
df2 = df.filter(df.amount > 100) # Transformation
df3 = df2.groupBy("region") # Transformation
df4 = df3.agg({"amount": "sum"}) # Transformation
# Action を呼んだ瞬間に、上の全 Transformation がまとめて実行される
df4.show() # Action → ここで初めて実行4-2. なぜ遅延評価か
遅延評価のメリット:
1. 最適化の余地が生まれる
→ Catalyst が Transformation の連鎖を見て、最適な実行計画を立てる
→ 例:不要なカラムの読み取りをスキップ(Projection Pushdown)
2. 不要な計算を省ける
→ 途中の Transformation の結果が不要なら計算しない
3. パイプライン化
→ 複数の Transformation を1パスで処理(中間データを書き出さない)
5. Job → Stage → Task
Action が呼ばれると、Spark は以下の階層で処理を分解する。
5-1. 全体像
Application(1つの SparkContext)
│
├── Job 1(1つの Action に対応)
│ ├── Stage 1(シャッフル境界で分割)
│ │ ├── Task 1(パーティション1を処理)
│ │ ├── Task 2(パーティション2を処理)
│ │ └── Task 3(パーティション3を処理)
│ └── Stage 2
│ ├── Task 1
│ └── Task 2
│
└── Job 2(別の Action)
└── Stage 3
└── ...
5-2. Narrow vs Wide Transformation
Stage の境界は「シャッフルが必要かどうか」で決まる。
| 種類 | シャッフル | 説明 | 例 |
|---|---|---|---|
| Narrow | なし | 各パーティションが独立に処理可能 | filter, map, union |
| Wide | あり | パーティション間のデータ交換が必要 | groupBy, join, repartition |
Narrow Transformation(パイプライン化可能):
Partition 1 → filter → map → 結果1
Partition 2 → filter → map → 結果2
Partition 3 → filter → map → 結果3
→ 各パーティションが独立。ネットワーク通信なし
Wide Transformation(シャッフル発生):
Partition 1 ─┐
Partition 2 ─┼── groupBy ──→ 新 Partition A(key=東京のデータ)
Partition 3 ─┘ 新 Partition B(key=大阪のデータ)
→ 全パーティションのデータを key で再分配。ネットワーク通信あり
5-3. シャッフルが遅い理由
シャッフルで起きること:
1. Map側: 各 Executor がデータを key でソート・分割してディスクに書き出し
2. ネットワーク: 分割されたデータを対応する Executor に転送
3. Reduce側: 受け取ったデータをマージ
ボトルネック:
- ディスクI/O(中間データの書き出し・読み込み)
- ネットワーク転送(大量データの移動)
- メモリ(ソート・マージに必要)
シャッフルを減らすことが Spark チューニングの最重要ポイント。
6. メモリ管理
6-1. Executor のメモリ構成
Executor Memory(spark.executor.memory)
┌─────────────────────────────────────────┐
│ │
│ ┌─────────────────────────────────┐ │
│ │ Unified Memory(60%) │ │
│ │ │ │
│ │ ┌───────────┬───────────────┐ │ │
│ │ │ Storage │ Execution │ │ │
│ │ │ (キャッシュ) │ (シャッフル等) │ │ │
│ │ │ │ │ │ │
│ │ │ ← 境界は動的に移動 → │ │ │
│ │ └───────────┴───────────────┘ │ │
│ └─────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────┐ │
│ │ User Memory(40%) │ │
│ │ ユーザーのデータ構造、UDF等 │ │
│ └─────────────────────────────────┘ │
│ │
│ Reserved Memory(300MB固定) │
└─────────────────────────────────────────┘
Unified Memory では Storage と Execution が動的にメモリを融通し合う。Execution が足りなければ Storage のキャッシュを追い出し、逆も可能。
6-2. Tungsten
Spark 独自のメモリ管理エンジン。JVM のガベージコレクション(GC)を回避するため、オフヒープメモリを直接管理する。
| 機能 | 説明 |
|---|---|
| バイナリ処理 | Java オブジェクトではなくバイト配列で処理 |
| コード生成 | 実行時にJavaバイトコードを生成(Whole-Stage Code Generation) |
| キャッシュ効率 | CPU キャッシュに乗りやすいメモリレイアウト |
7. 耐障害性
7-1. リネージによる再計算
RDD / DataFrame は変換の履歴(リネージ)を記録している。Executor が障害で失われた場合、リネージを辿って失われたパーティションだけを再計算する。
リネージ:
raw_data → filter → groupBy → agg
↑
Executor 障害でこのパーティションが失われた
→ filter からやり直して再計算
7-2. チェックポイント
リネージが長くなりすぎると再計算コストが高くなる。チェックポイントで中間結果をストレージに永続化し、リネージを切断できる。
8. まとめ
| 概念 | 要点 |
|---|---|
| アーキテクチャ | Driver(司令塔)+ Executor(実行者)+ Cluster Manager(リソース管理) |
| データ構造 | DataFrame が標準。RDD は低レベル用 |
| Lazy Evaluation | Transformation は遅延、Action で実行。最適化の余地を生む |
| 実行階層 | Job → Stage(シャッフル境界)→ Task(パーティション単位) |
| シャッフル | Wide Transformation で発生。最大のボトルネック |
| メモリ | Unified Memory で Storage と Execution を動的に融通 |
| 耐障害性 | リネージで失われたパーティションを再計算 |
次回は Catalyst Optimizer と DataFrame API を深掘りする。
参考文献
- Jules S. Damji et al. “Learning Spark.” 2nd Edition, O’Reilly Media, 2020.
- Matei Zaharia et al. “Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing.” NSDI, 2012.
- Databricks. “Apache Spark Documentation.” https://spark.apache.org/docs/latest/