45395 - シコウサクゴ -

YAML宣言型の意図検証:期待出力をコードではなく設定で定義する

2026-04-12
AI駆動開発
AI駆動開発
YAML
意図検証
パイプライン
CI
Python
Last updated:2026-04-12
9 Minutes
1656 Words

バッチ処理やデータパイプラインで「exit code 0なのに実は壊れていた」という事故を経験したことはないでしょうか。プロセスは正常終了した。しかし出力ファイルが0件だった。あるいは、前日のデータがそのまま残っていただけだった。

この問題を解決するために、期待する出力をYAMLで宣言的に定義し、実行後に自動検証する仕組みを構築しました。コードを1行も書かずに「このジョブは何を生成すべきか」を定義できます。

問題:exit code 0 ≠ 正常動作

なぜexit codeだけでは不十分か

1
$ python pipeline.py && echo "SUCCESS"
2
SUCCESS

この「SUCCESS」は何を保証しているでしょうか。

  • ○ プロセスが異常終了しなかった
  • × 期待するファイルが生成された
  • × ファイルの中身が空でない
  • × データの鮮度が24時間以内
  • × 下流のジョブが読める形式

実際に遭遇した事故パターンを紹介します。

事故exit code実際の状態
API制限でデータ取得0件0空のファイルが生成
例外をcatchして握りつぶし0処理が途中で中断、部分的なデータ
環境変数未設定でデフォルトパス使用0別ディレクトリに出力
前回の出力が残っていてスキップ0古いデータのまま

検証コードの増殖問題

最初は各ジョブのスクリプト末尾に検証コードを書いていました。

1
# job_a.py の末尾
2
output = Path("/data/output_a.parquet")
3
if not output.exists():
4
raise FileNotFoundError(f"出力がありません: {output}")
5
if output.stat().st_size < 1000:
6
raise ValueError(f"出力が小さすぎます: {output.stat().st_size} bytes")

これが数十ジョブに広がると問題が顕在化します。

  • 検証ロジックがジョブごとにバラバラ
  • 閾値(1000 bytes? 10000 bytes?)の根拠が不明
  • 新しいジョブを追加するたびに検証コードのコピペ
  • 「検証コード自体のテスト」という無限後退

解決策:YAMLで期待出力を宣言する

設計思想

検証ロジックをジョブのコードから分離し、宣言的な設定ファイルに集約します。

1
ジョブのコード(what to do) ← 分離 → 期待出力の定義(what to expect)

ジョブは「処理」に集中し、「検証」は外部の汎用バリデーターが担います。

YAMLスキーマの設計

pipeline_expectations.yaml
1
version: 1
2
3
jobs:
4
daily_data_fetch:
5
description: "外部APIからデータを取得して保存"
6
outputs:
7
- path: "/data/daily/{date}.parquet"
8
checks:
9
exists: true
10
min_size_bytes: 5000
11
max_age_hours: 25 # 日次ジョブなので25時間以内
12
min_rows: 100 # Parquetの場合、行数も検証
13
format: "parquet"
14
15 collapsed lines
15
hourly_aggregation:
16
description: "日次データを集計してサマリーを生成"
17
outputs:
18
- path: "/data/summary/latest.csv"
19
checks:
20
exists: true
21
min_size_bytes: 500
22
max_age_hours: 2
23
format: "csv"
24
required_columns: ["timestamp", "value", "count"]
25
- path: "/data/summary/latest.json"
26
checks:
27
exists: true
28
min_size_bytes: 100
29
max_age_hours: 2

バリデーターの実装

YAMLを読み、各チェック項目を順に検証する汎用スクリプトです。

1
import yaml
2
from pathlib import Path
3
from datetime import datetime, timedelta
4
import pyarrow.parquet as pq
5
import csv
6
7
def validate_job(job_name: str, config_path: str) -> list[str]:
8
"""ジョブの出力を検証し、失敗した項目のリストを返す"""
9
with open(config_path) as f:
10
config = yaml.safe_load(f)
11
12
job = config["jobs"].get(job_name)
13
if not job:
14
return [f"未定義のジョブ: {job_name}"]
15
47 collapsed lines
16
errors = []
17
today = datetime.now().strftime("%Y-%m-%d")
18
19
for output in job["outputs"]:
20
path = Path(output["path"].replace("{date}", today))
21
checks = output["checks"]
22
23
# 存在チェック
24
if checks.get("exists") and not path.exists():
25
errors.append(f"ファイルが存在しない: {path}")
26
continue # 以降のチェックはスキップ
27
28
# サイズチェック
29
if "min_size_bytes" in checks:
30
size = path.stat().st_size
31
if size < checks["min_size_bytes"]:
32
errors.append(
33
f"サイズ不足: {path} ({size} < {checks['min_size_bytes']} bytes)"
34
)
35
36
# 鮮度チェック
37
if "max_age_hours" in checks:
38
mtime = datetime.fromtimestamp(path.stat().st_mtime)
39
age_hours = (datetime.now() - mtime).total_seconds() / 3600
40
if age_hours > checks["max_age_hours"]:
41
errors.append(
42
f"データが古い: {path} ({age_hours:.1f}h > {checks['max_age_hours']}h)"
43
)
44
45
# 行数チェック(Parquet)
46
if checks.get("format") == "parquet" and "min_rows" in checks:
47
table = pq.read_table(path)
48
if len(table) < checks["min_rows"]:
49
errors.append(
50
f"行数不足: {path} ({len(table)} < {checks['min_rows']})"
51
)
52
53
# カラムチェック(CSV)
54
if checks.get("format") == "csv" and "required_columns" in checks:
55
with open(path) as csvfile:
56
reader = csv.reader(csvfile)
57
headers = next(reader)
58
missing = set(checks["required_columns"]) - set(headers)
59
if missing:
60
errors.append(f"カラム不足: {path}{missing}")
61
62
return errors

実行と結果

Terminal window
1
$ python validate_pipeline.py --job daily_data_fetch
2
daily_data_fetch: 全チェックOK (3/3 passed)
3
4
$ python validate_pipeline.py --job hourly_aggregation
5
hourly_aggregation: 1件の問題
6
- データが古い: /data/summary/latest.csv (26.3h > 2h)

この設計が解決する3つの問題

1. 検証ロジックの散在 → 1ファイルに集約

Before: 各ジョブのスクリプト末尾に検証コードが散らばっている After: pipeline_expectations.yaml を見れば全ジョブの期待出力が一覧できます

2. 新ジョブ追加のコスト → YAMLに数行追加するだけ

Before: 検証コードをコピペして閾値を書き換え After: YAMLにエントリを追加(コード変更ゼロ)

1
new_job:
2
description: "新しいジョブの説明"
3
outputs:
4
- path: "/data/new_output.parquet"
5
checks:
6
exists: true
7
min_size_bytes: 1000
8
max_age_hours: 25

3. 検証漏れの発見 → CI/CDに組み込み

1
# CI設定(例)
2
steps:
3
- name: Validate pipeline outputs
4
run: python validate_pipeline.py --all --fail-on-error

YAMLに定義されていないジョブ = 検証されていないジョブです。逆に言えば、YAMLにジョブを追加し忘れると「未検証ジョブ」として検出できます。

実践で学んだこと

閾値の決め方

最初は「なんとなく」で設定して、すぐに破綻しました。

失敗例: min_size_bytes: 10000 と設定 → 祝日にデータが少なくて正常なのにアラート

改善: 過去30日分のデータから統計値を取り、平均 - 2σ を下限にしました。

1
# 閾値の自動算出(初回セットアップ時に実行)
2
import statistics
3
4
sizes = [f.stat().st_size for f in Path("/data/daily/").glob("*.parquet")]
5
mean = statistics.mean(sizes)
6
stdev = statistics.stdev(sizes)
7
suggested_min = int(mean - 2 * stdev)
8
print(f"推奨 min_size_bytes: {suggested_min}")

YAMLのバージョン管理

pipeline_expectations.yaml をGit管理に含めることで、以下のメリットがあります。

  • ジョブの期待出力の変更履歴が追える
  • PRレビューで「この閾値変更は妥当か?」を議論できる
  • 意図しない閾値変更を検出できる

段階的な導入

一気に全ジョブに適用するのではなく、以下の順序で導入しました。

  1. 最も重要なジョブ3つに適用(事故があったジョブ優先)
  2. existsチェックだけで開始(最小限のYAML)
  3. 事故が発生するたびに、そのチェック項目をYAMLに追加
  4. 1ヶ月後に全ジョブに展開

まとめ

観点BeforeAfter
検証の場所各スクリプトの末尾YAML 1ファイル
新ジョブ追加コードのコピペYAML数行追加
期待出力の可視性コードを読まないとわからないYAMLで一覧
検証漏れの検出不可能YAML未定義 = 未検証
閾値の変更履歴なしGit履歴で追跡

「exit code 0 = 正常」という思い込みを捨てて、期待する出力を明示的に宣言する。この発想の転換だけで、データパイプラインの信頼性は大幅に向上します。

コードで「何をするか」を書き、YAMLで「何が生まれるべきか」を書く。この分離が、検証を持続可能にする鍵です。

Article title:YAML宣言型の意図検証:期待出力をコードではなく設定で定義する
Article author:45395
Release time:2026-04-12

記事へのご質問・ご感想をお聞かせください

フィードバックを送る