バッチ処理やデータパイプラインで「exit code 0なのに実は壊れていた」という事故を経験したことはないでしょうか。プロセスは正常終了した。しかし出力ファイルが0件だった。あるいは、前日のデータがそのまま残っていただけだった。
この問題を解決するために、期待する出力をYAMLで宣言的に定義し、実行後に自動検証する仕組みを構築しました。コードを1行も書かずに「このジョブは何を生成すべきか」を定義できます。
問題:exit code 0 ≠ 正常動作
なぜexit codeだけでは不十分か
1$ python pipeline.py && echo "SUCCESS"2SUCCESSこの「SUCCESS」は何を保証しているでしょうか。
- ○ プロセスが異常終了しなかった
- × 期待するファイルが生成された
- × ファイルの中身が空でない
- × データの鮮度が24時間以内
- × 下流のジョブが読める形式
実際に遭遇した事故パターンを紹介します。
| 事故 | exit code | 実際の状態 |
|---|---|---|
| API制限でデータ取得0件 | 0 | 空のファイルが生成 |
| 例外をcatchして握りつぶし | 0 | 処理が途中で中断、部分的なデータ |
| 環境変数未設定でデフォルトパス使用 | 0 | 別ディレクトリに出力 |
| 前回の出力が残っていてスキップ | 0 | 古いデータのまま |
検証コードの増殖問題
最初は各ジョブのスクリプト末尾に検証コードを書いていました。
1# job_a.py の末尾2output = Path("/data/output_a.parquet")3if not output.exists():4 raise FileNotFoundError(f"出力がありません: {output}")5if output.stat().st_size < 1000:6 raise ValueError(f"出力が小さすぎます: {output.stat().st_size} bytes")これが数十ジョブに広がると問題が顕在化します。
- 検証ロジックがジョブごとにバラバラ
- 閾値(1000 bytes? 10000 bytes?)の根拠が不明
- 新しいジョブを追加するたびに検証コードのコピペ
- 「検証コード自体のテスト」という無限後退
解決策:YAMLで期待出力を宣言する
設計思想
検証ロジックをジョブのコードから分離し、宣言的な設定ファイルに集約します。
1ジョブのコード(what to do) ← 分離 → 期待出力の定義(what to expect)ジョブは「処理」に集中し、「検証」は外部の汎用バリデーターが担います。
YAMLスキーマの設計
1version: 12
3jobs:4 daily_data_fetch:5 description: "外部APIからデータを取得して保存"6 outputs:7 - path: "/data/daily/{date}.parquet"8 checks:9 exists: true10 min_size_bytes: 500011 max_age_hours: 25 # 日次ジョブなので25時間以内12 min_rows: 100 # Parquetの場合、行数も検証13 format: "parquet"14
15 collapsed lines
15 hourly_aggregation:16 description: "日次データを集計してサマリーを生成"17 outputs:18 - path: "/data/summary/latest.csv"19 checks:20 exists: true21 min_size_bytes: 50022 max_age_hours: 223 format: "csv"24 required_columns: ["timestamp", "value", "count"]25 - path: "/data/summary/latest.json"26 checks:27 exists: true28 min_size_bytes: 10029 max_age_hours: 2バリデーターの実装
YAMLを読み、各チェック項目を順に検証する汎用スクリプトです。
1import yaml2from pathlib import Path3from datetime import datetime, timedelta4import pyarrow.parquet as pq5import csv6
7def validate_job(job_name: str, config_path: str) -> list[str]:8 """ジョブの出力を検証し、失敗した項目のリストを返す"""9 with open(config_path) as f:10 config = yaml.safe_load(f)11
12 job = config["jobs"].get(job_name)13 if not job:14 return [f"未定義のジョブ: {job_name}"]15
47 collapsed lines
16 errors = []17 today = datetime.now().strftime("%Y-%m-%d")18
19 for output in job["outputs"]:20 path = Path(output["path"].replace("{date}", today))21 checks = output["checks"]22
23 # 存在チェック24 if checks.get("exists") and not path.exists():25 errors.append(f"ファイルが存在しない: {path}")26 continue # 以降のチェックはスキップ27
28 # サイズチェック29 if "min_size_bytes" in checks:30 size = path.stat().st_size31 if size < checks["min_size_bytes"]:32 errors.append(33 f"サイズ不足: {path} ({size} < {checks['min_size_bytes']} bytes)"34 )35
36 # 鮮度チェック37 if "max_age_hours" in checks:38 mtime = datetime.fromtimestamp(path.stat().st_mtime)39 age_hours = (datetime.now() - mtime).total_seconds() / 360040 if age_hours > checks["max_age_hours"]:41 errors.append(42 f"データが古い: {path} ({age_hours:.1f}h > {checks['max_age_hours']}h)"43 )44
45 # 行数チェック(Parquet)46 if checks.get("format") == "parquet" and "min_rows" in checks:47 table = pq.read_table(path)48 if len(table) < checks["min_rows"]:49 errors.append(50 f"行数不足: {path} ({len(table)} < {checks['min_rows']})"51 )52
53 # カラムチェック(CSV)54 if checks.get("format") == "csv" and "required_columns" in checks:55 with open(path) as csvfile:56 reader = csv.reader(csvfile)57 headers = next(reader)58 missing = set(checks["required_columns"]) - set(headers)59 if missing:60 errors.append(f"カラム不足: {path} — {missing}")61
62 return errors実行と結果
1$ python validate_pipeline.py --job daily_data_fetch2✅ daily_data_fetch: 全チェックOK (3/3 passed)3
4$ python validate_pipeline.py --job hourly_aggregation5❌ hourly_aggregation: 1件の問題6 - データが古い: /data/summary/latest.csv (26.3h > 2h)この設計が解決する3つの問題
1. 検証ロジックの散在 → 1ファイルに集約
Before: 各ジョブのスクリプト末尾に検証コードが散らばっている
After: pipeline_expectations.yaml を見れば全ジョブの期待出力が一覧できます
2. 新ジョブ追加のコスト → YAMLに数行追加するだけ
Before: 検証コードをコピペして閾値を書き換え After: YAMLにエントリを追加(コード変更ゼロ)
1 new_job:2 description: "新しいジョブの説明"3 outputs:4 - path: "/data/new_output.parquet"5 checks:6 exists: true7 min_size_bytes: 10008 max_age_hours: 253. 検証漏れの発見 → CI/CDに組み込み
1# CI設定(例)2steps:3 - name: Validate pipeline outputs4 run: python validate_pipeline.py --all --fail-on-errorYAMLに定義されていないジョブ = 検証されていないジョブです。逆に言えば、YAMLにジョブを追加し忘れると「未検証ジョブ」として検出できます。
実践で学んだこと
閾値の決め方
最初は「なんとなく」で設定して、すぐに破綻しました。
失敗例: min_size_bytes: 10000 と設定 → 祝日にデータが少なくて正常なのにアラート
改善: 過去30日分のデータから統計値を取り、平均 - 2σ を下限にしました。
1# 閾値の自動算出(初回セットアップ時に実行)2import statistics3
4sizes = [f.stat().st_size for f in Path("/data/daily/").glob("*.parquet")]5mean = statistics.mean(sizes)6stdev = statistics.stdev(sizes)7suggested_min = int(mean - 2 * stdev)8print(f"推奨 min_size_bytes: {suggested_min}")YAMLのバージョン管理
pipeline_expectations.yaml をGit管理に含めることで、以下のメリットがあります。
- ジョブの期待出力の変更履歴が追える
- PRレビューで「この閾値変更は妥当か?」を議論できる
- 意図しない閾値変更を検出できる
段階的な導入
一気に全ジョブに適用するのではなく、以下の順序で導入しました。
- 最も重要なジョブ3つに適用(事故があったジョブ優先)
- existsチェックだけで開始(最小限のYAML)
- 事故が発生するたびに、そのチェック項目をYAMLに追加
- 1ヶ月後に全ジョブに展開
まとめ
| 観点 | Before | After |
|---|---|---|
| 検証の場所 | 各スクリプトの末尾 | YAML 1ファイル |
| 新ジョブ追加 | コードのコピペ | YAML数行追加 |
| 期待出力の可視性 | コードを読まないとわからない | YAMLで一覧 |
| 検証漏れの検出 | 不可能 | YAML未定義 = 未検証 |
| 閾値の変更履歴 | なし | Git履歴で追跡 |
「exit code 0 = 正常」という思い込みを捨てて、期待する出力を明示的に宣言する。この発想の転換だけで、データパイプラインの信頼性は大幅に向上します。
コードで「何をするか」を書き、YAMLで「何が生まれるべきか」を書く。この分離が、検証を持続可能にする鍵です。