データパイプラインで最も厄介なバグは、ジョブ単体では検出できない「つなぎ目」の不整合です。ジョブAがoutput_daily.parquetを出力し、ジョブBがoutput_Daily.parquetを読む——大文字小文字が違うだけで、macOSでは動くがLinuxでは壊れます。
こうした「名前の不一致」をCIで自動検出する仕組みを構築しました。
問題:パイプラインの「つなぎ目」で起きるバグ
典型的な不一致パターン
1ジョブA: output → /data/daily_report.parquet2ジョブB: input ← /data/daily-report.parquet ← アンダースコアとハイフンジョブAもジョブBも単体テストは通ります。しかし結合すると壊れます。
実際に遭遇した不一致のバリエーションを紹介します。
| 種類 | 上流の出力 | 下流の入力 | 問題 |
|---|---|---|---|
| ケース違い | Daily.parquet | daily.parquet | macOSは通る、Linuxで失敗 |
| 区切り文字 | daily_report | daily-report | 単純なタイポ |
| 日付形式 | 2026-04-12.parquet | 20260412.parquet | フォーマット不一致 |
| ディレクトリ | /data/v2/output.csv | /data/output.csv | パス変更の伝搬漏れ |
| カラム名 | user_id (出力) | userId (入力) | 命名規則の不統一 |
なぜ単体テストでは見つからないか
1# ジョブAのテスト2def test_job_a_output():3 result = job_a.run()4 assert result.path.exists() # ✅ ファイルは生成された5 assert len(result.data) > 0 # ✅ データも入っている6
7# ジョブBのテスト8def test_job_b_processing():9 mock_input = create_test_data() # テスト用のモックデータ10 result = job_b.process(mock_input)11 assert result.is_valid() # ✅ 処理ロジックは正しいどちらのテストも合格します。しかしジョブAの実際の出力パスとジョブBの実際の入力パスが一致しているかは、どちらのテストも検証していません。
解決策:接続定義の一元管理 + CIテスト
設計思想
パイプラインの「つなぎ目」を暗黙の知識にせず、明示的な接続定義として管理します。
1暗黙: ジョブAのコード内のパスとジョブBのコード内のパスが「たまたま一致」2明示: 接続定義ファイルで「ジョブAの出力 = ジョブBの入力」を宣言接続定義ファイル
1connections:2 - name: "daily_data_flow"3 producer:4 job: "daily_fetch"5 output_key: "daily_data" # ジョブ内で定義された出力キー6 consumer:7 job: "daily_aggregation"8 input_key: "raw_data" # ジョブ内で定義された入力キー9
10 - name: "aggregation_to_report"11 producer:12 job: "daily_aggregation"13 output_key: "summary"14 consumer:2 collapsed lines
15 job: "report_generator"16 input_key: "aggregated_data"ジョブ側の定義
各ジョブは入出力をレジストリに登録します。
1# パイプラインレジストリ2PIPELINE_REGISTRY: dict[str, dict[str, str]] = {}3
4def register_output(job_name: str, key: str, path: str):5 PIPELINE_REGISTRY.setdefault(job_name, {})6 PIPELINE_REGISTRY[job_name][f"output:{key}"] = path7
8def register_input(job_name: str, key: str, path: str):9 PIPELINE_REGISTRY.setdefault(job_name, {})10 PIPELINE_REGISTRY[job_name][f"input:{key}"] = path11
12# ジョブAでの使用13register_output("daily_fetch", "daily_data", "/data/daily/{date}.parquet")14
15# ジョブBでの使用1 collapsed line
16register_input("daily_aggregation", "raw_data", "/data/daily/{date}.parquet")CIテスト: 名前一致性の検証
1import yaml2import pytest3
4def load_connections() -> list[dict]:5 with open("pipeline_connections.yaml") as f:6 return yaml.safe_load(f)["connections"]7
8def load_registry() -> dict:9 # 全ジョブのレジストリをインポートして収集10 import importlib11 for module_name in discover_job_modules():12 importlib.import_module(module_name)13 return PIPELINE_REGISTRY14
15@pytest.fixture64 collapsed lines
16def connections():17 return load_connections()18
19@pytest.fixture20def registry():21 return load_registry()22
23def test_all_connections_have_matching_paths(connections, registry):24 """全接続の入出力パスが一致することを検証"""25 errors = []26 for conn in connections:27 producer_job = conn["producer"]["job"]28 producer_key = f"output:{conn['producer']['output_key']}"29 consumer_job = conn["consumer"]["job"]30 consumer_key = f"input:{conn['consumer']['input_key']}"31
32 producer_path = registry.get(producer_job, {}).get(producer_key)33 consumer_path = registry.get(consumer_job, {}).get(consumer_key)34
35 if producer_path is None:36 errors.append(f"{conn['name']}: producer {producer_job}.{producer_key} が未登録")37 elif consumer_path is None:38 errors.append(f"{conn['name']}: consumer {consumer_job}.{consumer_key} が未登録")39 elif producer_path != consumer_path:40 errors.append(41 f"{conn['name']}: パス不一致\n"42 f" producer: {producer_path}\n"43 f" consumer: {consumer_path}"44 )45
46 assert not errors, "\n".join(errors)47
48def test_no_orphan_outputs(connections, registry):49 """消費者のいない出力を検出"""50 connected_outputs = {51 (c["producer"]["job"], c["producer"]["output_key"])52 for c in connections53 }54 all_outputs = {55 (job, key.split(":")[1])56 for job, entries in registry.items()57 for key in entries58 if key.startswith("output:")59 }60 orphans = all_outputs - connected_outputs61 if orphans:62 # 警告のみ(最終出力は消費者がいなくて正常)63 for job, key in orphans:64 print(f"⚠️ 未接続の出力: {job}.{key}")65
66def test_no_dangling_inputs(connections, registry):67 """供給者のいない入力を検出"""68 connected_inputs = {69 (c["consumer"]["job"], c["consumer"]["input_key"])70 for c in connections71 }72 all_inputs = {73 (job, key.split(":")[1])74 for job, entries in registry.items()75 for key in entries76 if key.startswith("input:")77 }78 dangling = all_inputs - connected_inputs79 assert not dangling, f"供給者のない入力: {dangling}"CI実行結果
1$ pytest tests/test_pipeline_connections.py -v2test_all_connections_have_matching_paths PASSED3test_no_orphan_outputs PASSED4test_no_dangling_inputs PASSED5
6# パス不一致がある場合:7test_all_connections_have_matching_paths FAILED8 daily_data_flow: パス不一致9 producer: /data/daily/{date}.parquet10 consumer: /data/Daily/{date}.parquet追加の検証: カラム名の一致性
ファイルパスだけでなく、データのカラム名もつなぎ目で不整合が起きやすいです。
1def test_column_compatibility(connections, registry):2 """上流の出力カラムが下流の必須カラムを満たすことを検証"""3 for conn in connections:4 if "expected_columns" not in conn.get("consumer", {}):5 continue6
7 producer_job = conn["producer"]["job"]8 expected = set(conn["consumer"]["expected_columns"])9
10 # テストデータで実際にジョブを実行して出力カラムを取得11 actual_columns = run_job_and_get_columns(producer_job)12 missing = expected - actual_columns13
14 assert not missing, (15 f"{conn['name']}: カラム不足 — "2 collapsed lines
16 f"下流が期待: {expected}, 上流に不在: {missing}"17 )段階的な導入方法
Phase 1: 最もクリティカルな接続3つだけ
いきなり全接続を定義するのはコストが高いです。まず壊れたら最も困る接続だけを対象にします。
Phase 2: 新規ジョブ追加時のルール化
「新しいジョブを追加する場合、接続定義の更新を必須とする」というルールをCIに組み込みます。
1def test_new_jobs_have_connections():2 """レジストリに登録されたジョブが接続定義に含まれているか"""3 registered_jobs = set(registry.keys())4 connected_jobs = set()5 for conn in connections:6 connected_jobs.add(conn["producer"]["job"])7 connected_jobs.add(conn["consumer"]["job"])8
9 unconnected = registered_jobs - connected_jobs10 assert not unconnected, f"接続未定義のジョブ: {unconnected}"Phase 3: 全接続の網羅
既存ジョブも順次接続定義に追加し、最終的に全接続をカバーします。
まとめ
| 検証レベル | 対象 | 検出できるバグ |
|---|---|---|
| 単体テスト | 個別ジョブ | ロジックのバグ |
| 名前一致性テスト | ジョブ間の接続 | パス不一致、カラム不一致 |
| 結合テスト | パイプライン全体 | データフローの異常 |
パイプラインのバグは「つなぎ目」で起きます。そして「つなぎ目」は暗黙知になりやすいです。接続定義を明示的なファイルにして、CIで自動検証する。これだけで結合バグの大半を事前に潰せます。
「コードを書く前に、まず接続を定義する」——このワークフローが定着すれば、パイプラインの信頼性は根本的に変わります。