外部 API から時系列データを取得して下流に流す処理で、API 側が更新を止めているのに気づかず古いデータで分析を続ける事故が何度かありました。
「最新データだと思って動いていたのに、実は 2 日前のデータだった」という状態は、後段の判定すべてを汚染します。鮮度判定と代替ソースへの自動切替(fallback チェーン)でこれを防いだ記録です。
何が起きたか
古いデータで動き続ける
外部 API のレスポンスは正常(200 OK)、データもパースできる。ただ、中身の最終更新日時が 2 日前で止まっていた。
1def fetch_latest():2 resp = requests.get("https://api.example.com/latest")3 resp.raise_for_status()4 return resp.json()resp.raise_for_status() は 200 OK なので通る。データの 中身 に対する検証が無いと、古いデータでも何事もなかったように下流に流れます。
影響範囲
下流のすべての処理が古いデータで動きます。
- 集計値が「昨日の値と同じ」になり続ける(誰も気づかない)
- アラートのベースライン計算が止まる
- ダッシュボードが「凍結された数字」を表示し続ける
「データが古い」ことを下流の指標で気づくのは難しいです。取得時点で鮮度判定するしかありません。
鮮度判定の設計
何を「stale」とみなすか
データの種類によって「許容できる古さ」が違います。
1sources:2 realtime_metric:3 max_age_minutes: 5 # 5 分超は stale4 hourly_aggregate:5 max_age_minutes: 90 # 90 分(1.5h)まで許容6 daily_summary:7 max_age_minutes: 1500 # 25h(日次なので翌日少し過ぎても許容)鮮度判定関数
1from datetime import datetime, timedelta2from dataclasses import dataclass3
4@dataclass5class FreshnessResult:6 is_fresh: bool7 age_minutes: float8 threshold_minutes: float9 data_timestamp: datetime10
11def check_freshness(data: dict, source_name: str) -> FreshnessResult:12 """データの中身のタイムスタンプから鮮度を判定"""13 cfg = freshness_config.get(source_name)14 threshold = cfg.max_age_minutes15
14 collapsed lines
16 # データに含まれるタイムスタンプを取得17 ts_str = data.get("last_updated") or data.get("timestamp")18 if not ts_str:19 raise ValueError(f"{source_name}: timestamp field missing")20
21 data_ts = datetime.fromisoformat(ts_str)22 age = (datetime.now(data_ts.tzinfo) - data_ts).total_seconds() / 6023
24 return FreshnessResult(25 is_fresh=age <= threshold,26 age_minutes=age,27 threshold_minutes=threshold,28 data_timestamp=data_ts,29 )取得時に鮮度判定
1def fetch_with_freshness_check(source_name: str) -> dict:2 data = _fetch(source_name)3 result = check_freshness(data, source_name)4
5 if not result.is_fresh:6 logger.warning(7 f"{source_name} is stale: "8 f"age={result.age_minutes:.0f}min > threshold={result.threshold_minutes}min"9 )10 raise StaleDataError(source_name, result)11
12 return dataraise するのは強気に見えますが、stale データで下流処理を続けるよりは止めたほうが安全です。ただし止めるだけだと運用が回らないので、次に fallback を入れます。
Fallback チェーン
概念
「優先順位付きの代替ソース」を並べて、上から順に試します。
1SOURCE_CHAIN = [2 "primary_api", # 最優先3 "secondary_api", # primary が stale なら4 "cached_snapshot", # secondary も stale なら(数時間前のキャッシュ)5 "manual_override", # 全部 stale なら(運用者が手動で置く)6]7
8def fetch_with_fallback(metric_name: str) -> tuple[dict, str]:9 """優先順に試して、最初に fresh だったソースを返す"""10 last_error = None11 for source in SOURCE_CHAIN:12 try:13 data = fetch_with_freshness_check(f"{source}.{metric_name}")14 return data, source15 except (StaleDataError, requests.RequestException) as e:5 collapsed lines
16 logger.warning(f"{source} failed: {e}")17 last_error = e18
19 # 全部失敗20 raise AllSourcesStaleError(metric_name, last_error)Fallback が発動したら通知
「primary が stale で secondary に切り替わった」状態は 異常ではないが 要観察です。発動したら必ず Slack 通知します。
1def fetch_with_fallback_and_notify(metric_name: str):2 data, used_source = fetch_with_fallback(metric_name)3 if used_source != SOURCE_CHAIN[0]:4 send_slack(5 f"⚠️ {metric_name} は {used_source} を使用 "6 f"(primary stale or down)"7 )8 return data, used_sourceキャッシュ(中間スナップショット)
cached_snapshot は、直近の fresh データを定期的に保存しておく仕組みです。primary も secondary も死んだ場合の最終手段として使います。
1SNAPSHOT_DIR = Path("/var/data/snapshots")2
3def save_snapshot(metric_name: str, data: dict):4 """fresh なデータを取得できたら snapshot として保存"""5 path = SNAPSHOT_DIR / f"{metric_name}.json"6 path.write_text(json.dumps({7 "data": data,8 "saved_at": datetime.now().isoformat(),9 }))10
11def fetch_from_snapshot(metric_name: str) -> dict:12 path = SNAPSHOT_DIR / f"{metric_name}.json"13 if not path.exists():14 raise FileNotFoundError15 snapshot = json.loads(path.read_text())5 collapsed lines
16 saved_at = datetime.fromisoformat(snapshot["saved_at"])17 age_h = (datetime.now() - saved_at).total_seconds() / 360018 if age_h > 24:19 raise StaleSnapshotError(f"snapshot is {age_h:.0f}h old")20 return snapshot["data"]24 時間より古い snapshot は使わない、という制限を入れます。snapshot 自体が腐っていたら意味がないので。
切替時の整合性チェック
異なるソースで値がズレる問題
primary と secondary で データの形式が微妙に違う ケースがあります。
- 単位が違う(円 vs ドル、KB vs MB)
- カラム名が違う(
valuevsmetric_value) - スキーマが違う(フラット vs ネスト)
切り替えた瞬間に下流のロジックが壊れます。正規化レイヤーを挟みます。
1def normalize(data: dict, source: str) -> dict:2 """ソースごとに異なる形式を共通フォーマットに正規化"""3 if source.startswith("primary"):4 return {5 "timestamp": data["last_updated"],6 "value": data["value"],7 }8 elif source.startswith("secondary"):9 return {10 "timestamp": data["timestamp"],11 "value": data["metric_value"] / 1000, # 単位変換12 }13 elif source.startswith("cached"):14 return data # snapshot は既に正規化済み15 else:1 collapsed line
16 raise ValueError(f"unknown source: {source}")値のレンジチェック
切り替え後の値が 直前の値から極端に変わっていないか 確認します。
1def sanity_check(new_data: dict, last_known_data: dict | None) -> bool:2 if last_known_data is None:3 return True4 new_v = new_data["value"]5 last_v = last_known_data["value"]6 if last_v == 0:7 return new_v < 1.0 # 急に大きな値は怪しい8 ratio = new_v / last_v9 return 0.5 <= ratio <= 2.0 # 半分〜2 倍の範囲範囲外なら警告を上げて、人間に判断を仰ぎます。
運用してみての気づき
効いた点
1. stale を「気づける」ようになった
事故の原因の大半が「気づかなかった」だったので、取得時に必ず鮮度判定を通すだけで効果が大きかった。
2. 一時的な障害で全停止しなくなった
primary API が 30 分メンテで死んでも、secondary に切り替わって動き続けます。「メンテ中だから止まる」のは前は当たり前でしたが、now は通知だけで動き続けます。
3. snapshot のありがたみ
primary も secondary も同時に死ぬケースは滅多にないですが、起きたときに snapshot で 1 時間程度なら持ちこたえられます。「データソースの最終的な保険」として機能しました。
罠
罠 1: snapshot を保存し忘れる
fallback の分岐に入った後で save_snapshot を呼んでいたので、primary が一度も成功しなかったときに snapshot が更新されない。
修正: primary 成功時に必ず snapshot 保存。
1data = fetch_with_freshness_check("primary_api")2save_snapshot(metric_name, data) # 取得直後に保存罠 2: 単位ミスを見逃す
normalization のロジックを最初に書いたとき、secondary の単位変換を間違えて「primary が KB、secondary が KB → MB に誤変換」していました。値が突然 1000 倍に。
修正: sanity_check のレンジを厳しめに設定し、切替直後は 1 サイクル分は警告のみで使わないようにする。
罠 3: fallback 通知の頻度
fallback が発動するたびに通知すると、primary が長時間死んでいるケースで Slack が埋まります。
修正: 「同じ理由の通知は 1 時間に 1 回まで」にする。
まとめ
| レイヤー | 役割 |
|---|---|
| 鮮度判定 | データの中身の timestamp で stale を検知 |
| Fallback チェーン | primary → secondary → cached → manual の優先順位 |
| 正規化レイヤー | ソース間の形式差を吸収 |
| Sanity check | 切替時の値の妥当性を確認 |
| 通知 | fallback 発動時に Slack で要観察マーク |
外部データに依存するシステムで、「データソースが完璧」と仮定するのは危険です。
鮮度判定 + fallback チェーン + 正規化 のセットを組むと、データソース側の障害をシステム全体の障害にしない柔軟性が生まれます。実装はそれなりに重いですが、長期運用では負担を回収できる投資でした。