45395 - シコウサクゴ -

名前一致性CIテスト:データパイプラインの「つなぎ目」を自動検証する

2026-04-12
AI駆動開発
AI駆動開発
CI
テスト
パイプライン
Python
pytest
Last updated:2026-04-12
7 Minutes
1393 Words

データパイプラインで最も厄介なバグは、ジョブ単体では検出できない「つなぎ目」の不整合です。ジョブAがoutput_daily.parquetを出力し、ジョブBがoutput_Daily.parquetを読む——大文字小文字が違うだけで、macOSでは動くがLinuxでは壊れます。

こうした「名前の不一致」をCIで自動検出する仕組みを構築しました。

問題:パイプラインの「つなぎ目」で起きるバグ

典型的な不一致パターン

1
ジョブA: output → /data/daily_report.parquet
2
ジョブB: input ← /data/daily-report.parquet ← アンダースコアとハイフン

ジョブAもジョブBも単体テストは通ります。しかし結合すると壊れます。

実際に遭遇した不一致のバリエーションを紹介します。

種類上流の出力下流の入力問題
ケース違いDaily.parquetdaily.parquetmacOSは通る、Linuxで失敗
区切り文字daily_reportdaily-report単純なタイポ
日付形式2026-04-12.parquet20260412.parquetフォーマット不一致
ディレクトリ/data/v2/output.csv/data/output.csvパス変更の伝搬漏れ
カラム名user_id (出力)userId (入力)命名規則の不統一

なぜ単体テストでは見つからないか

1
# ジョブAのテスト
2
def test_job_a_output():
3
result = job_a.run()
4
assert result.path.exists() # ✅ ファイルは生成された
5
assert len(result.data) > 0 # ✅ データも入っている
6
7
# ジョブBのテスト
8
def test_job_b_processing():
9
mock_input = create_test_data() # テスト用のモックデータ
10
result = job_b.process(mock_input)
11
assert result.is_valid() # ✅ 処理ロジックは正しい

どちらのテストも合格します。しかしジョブAの実際の出力パスとジョブBの実際の入力パスが一致しているかは、どちらのテストも検証していません。

解決策:接続定義の一元管理 + CIテスト

設計思想

パイプラインの「つなぎ目」を暗黙の知識にせず、明示的な接続定義として管理します。

1
暗黙: ジョブAのコード内のパスとジョブBのコード内のパスが「たまたま一致」
2
明示: 接続定義ファイルで「ジョブAの出力 = ジョブBの入力」を宣言

接続定義ファイル

pipeline_connections.yaml
1
connections:
2
- name: "daily_data_flow"
3
producer:
4
job: "daily_fetch"
5
output_key: "daily_data" # ジョブ内で定義された出力キー
6
consumer:
7
job: "daily_aggregation"
8
input_key: "raw_data" # ジョブ内で定義された入力キー
9
10
- name: "aggregation_to_report"
11
producer:
12
job: "daily_aggregation"
13
output_key: "summary"
14
consumer:
2 collapsed lines
15
job: "report_generator"
16
input_key: "aggregated_data"

ジョブ側の定義

各ジョブは入出力をレジストリに登録します。

1
# パイプラインレジストリ
2
PIPELINE_REGISTRY: dict[str, dict[str, str]] = {}
3
4
def register_output(job_name: str, key: str, path: str):
5
PIPELINE_REGISTRY.setdefault(job_name, {})
6
PIPELINE_REGISTRY[job_name][f"output:{key}"] = path
7
8
def register_input(job_name: str, key: str, path: str):
9
PIPELINE_REGISTRY.setdefault(job_name, {})
10
PIPELINE_REGISTRY[job_name][f"input:{key}"] = path
11
12
# ジョブAでの使用
13
register_output("daily_fetch", "daily_data", "/data/daily/{date}.parquet")
14
15
# ジョブBでの使用
1 collapsed line
16
register_input("daily_aggregation", "raw_data", "/data/daily/{date}.parquet")

CIテスト: 名前一致性の検証

1
import yaml
2
import pytest
3
4
def load_connections() -> list[dict]:
5
with open("pipeline_connections.yaml") as f:
6
return yaml.safe_load(f)["connections"]
7
8
def load_registry() -> dict:
9
# 全ジョブのレジストリをインポートして収集
10
import importlib
11
for module_name in discover_job_modules():
12
importlib.import_module(module_name)
13
return PIPELINE_REGISTRY
14
15
@pytest.fixture
64 collapsed lines
16
def connections():
17
return load_connections()
18
19
@pytest.fixture
20
def registry():
21
return load_registry()
22
23
def test_all_connections_have_matching_paths(connections, registry):
24
"""全接続の入出力パスが一致することを検証"""
25
errors = []
26
for conn in connections:
27
producer_job = conn["producer"]["job"]
28
producer_key = f"output:{conn['producer']['output_key']}"
29
consumer_job = conn["consumer"]["job"]
30
consumer_key = f"input:{conn['consumer']['input_key']}"
31
32
producer_path = registry.get(producer_job, {}).get(producer_key)
33
consumer_path = registry.get(consumer_job, {}).get(consumer_key)
34
35
if producer_path is None:
36
errors.append(f"{conn['name']}: producer {producer_job}.{producer_key} が未登録")
37
elif consumer_path is None:
38
errors.append(f"{conn['name']}: consumer {consumer_job}.{consumer_key} が未登録")
39
elif producer_path != consumer_path:
40
errors.append(
41
f"{conn['name']}: パス不一致\n"
42
f" producer: {producer_path}\n"
43
f" consumer: {consumer_path}"
44
)
45
46
assert not errors, "\n".join(errors)
47
48
def test_no_orphan_outputs(connections, registry):
49
"""消費者のいない出力を検出"""
50
connected_outputs = {
51
(c["producer"]["job"], c["producer"]["output_key"])
52
for c in connections
53
}
54
all_outputs = {
55
(job, key.split(":")[1])
56
for job, entries in registry.items()
57
for key in entries
58
if key.startswith("output:")
59
}
60
orphans = all_outputs - connected_outputs
61
if orphans:
62
# 警告のみ(最終出力は消費者がいなくて正常)
63
for job, key in orphans:
64
print(f"⚠️ 未接続の出力: {job}.{key}")
65
66
def test_no_dangling_inputs(connections, registry):
67
"""供給者のいない入力を検出"""
68
connected_inputs = {
69
(c["consumer"]["job"], c["consumer"]["input_key"])
70
for c in connections
71
}
72
all_inputs = {
73
(job, key.split(":")[1])
74
for job, entries in registry.items()
75
for key in entries
76
if key.startswith("input:")
77
}
78
dangling = all_inputs - connected_inputs
79
assert not dangling, f"供給者のない入力: {dangling}"

CI実行結果

1
$ pytest tests/test_pipeline_connections.py -v
2
test_all_connections_have_matching_paths PASSED
3
test_no_orphan_outputs PASSED
4
test_no_dangling_inputs PASSED
5
6
# パス不一致がある場合:
7
test_all_connections_have_matching_paths FAILED
8
daily_data_flow: パス不一致
9
producer: /data/daily/{date}.parquet
10
consumer: /data/Daily/{date}.parquet

追加の検証: カラム名の一致性

ファイルパスだけでなく、データのカラム名もつなぎ目で不整合が起きやすいです。

1
def test_column_compatibility(connections, registry):
2
"""上流の出力カラムが下流の必須カラムを満たすことを検証"""
3
for conn in connections:
4
if "expected_columns" not in conn.get("consumer", {}):
5
continue
6
7
producer_job = conn["producer"]["job"]
8
expected = set(conn["consumer"]["expected_columns"])
9
10
# テストデータで実際にジョブを実行して出力カラムを取得
11
actual_columns = run_job_and_get_columns(producer_job)
12
missing = expected - actual_columns
13
14
assert not missing, (
15
f"{conn['name']}: カラム不足 — "
2 collapsed lines
16
f"下流が期待: {expected}, 上流に不在: {missing}"
17
)

段階的な導入方法

Phase 1: 最もクリティカルな接続3つだけ

いきなり全接続を定義するのはコストが高いです。まず壊れたら最も困る接続だけを対象にします。

Phase 2: 新規ジョブ追加時のルール化

「新しいジョブを追加する場合、接続定義の更新を必須とする」というルールをCIに組み込みます。

1
def test_new_jobs_have_connections():
2
"""レジストリに登録されたジョブが接続定義に含まれているか"""
3
registered_jobs = set(registry.keys())
4
connected_jobs = set()
5
for conn in connections:
6
connected_jobs.add(conn["producer"]["job"])
7
connected_jobs.add(conn["consumer"]["job"])
8
9
unconnected = registered_jobs - connected_jobs
10
assert not unconnected, f"接続未定義のジョブ: {unconnected}"

Phase 3: 全接続の網羅

既存ジョブも順次接続定義に追加し、最終的に全接続をカバーします。

まとめ

検証レベル対象検出できるバグ
単体テスト個別ジョブロジックのバグ
名前一致性テストジョブ間の接続パス不一致、カラム不一致
結合テストパイプライン全体データフローの異常

パイプラインのバグは「つなぎ目」で起きます。そして「つなぎ目」は暗黙知になりやすいです。接続定義を明示的なファイルにして、CIで自動検証する。これだけで結合バグの大半を事前に潰せます。

「コードを書く前に、まず接続を定義する」——このワークフローが定着すれば、パイプラインの信頼性は根本的に変わります。

Article title:名前一致性CIテスト:データパイプラインの「つなぎ目」を自動検証する
Article author:45395
Release time:2026-04-12

記事へのご質問・ご感想をお聞かせください

フィードバックを送る