45395 - シコウサクゴ -

上流データの stale 検知と代替ソース自動切替:fallback チェーンで「データが古い」を運用で吸収する

2026-05-08
AI駆動開発
AI駆動開発
データ品質
フォールバック
Python
外部API
運用設計
Last updated:2026-05-07
10 Minutes
1869 Words

外部 API から時系列データを取得して下流に流す処理で、API 側が更新を止めているのに気づかず古いデータで分析を続ける事故が何度かありました。

「最新データだと思って動いていたのに、実は 2 日前のデータだった」という状態は、後段の判定すべてを汚染します。鮮度判定と代替ソースへの自動切替(fallback チェーン)でこれを防いだ記録です。

何が起きたか

古いデータで動き続ける

外部 API のレスポンスは正常(200 OK)、データもパースできる。ただ、中身の最終更新日時が 2 日前で止まっていた

1
def fetch_latest():
2
resp = requests.get("https://api.example.com/latest")
3
resp.raise_for_status()
4
return resp.json()

resp.raise_for_status() は 200 OK なので通る。データの 中身 に対する検証が無いと、古いデータでも何事もなかったように下流に流れます。

影響範囲

下流のすべての処理が古いデータで動きます。

  • 集計値が「昨日の値と同じ」になり続ける(誰も気づかない)
  • アラートのベースライン計算が止まる
  • ダッシュボードが「凍結された数字」を表示し続ける

「データが古い」ことを下流の指標で気づくのは難しいです。取得時点で鮮度判定するしかありません。

鮮度判定の設計

何を「stale」とみなすか

データの種類によって「許容できる古さ」が違います。

config/freshness.yml
1
sources:
2
realtime_metric:
3
max_age_minutes: 5 # 5 分超は stale
4
hourly_aggregate:
5
max_age_minutes: 90 # 90 分(1.5h)まで許容
6
daily_summary:
7
max_age_minutes: 1500 # 25h(日次なので翌日少し過ぎても許容)

鮮度判定関数

1
from datetime import datetime, timedelta
2
from dataclasses import dataclass
3
4
@dataclass
5
class FreshnessResult:
6
is_fresh: bool
7
age_minutes: float
8
threshold_minutes: float
9
data_timestamp: datetime
10
11
def check_freshness(data: dict, source_name: str) -> FreshnessResult:
12
"""データの中身のタイムスタンプから鮮度を判定"""
13
cfg = freshness_config.get(source_name)
14
threshold = cfg.max_age_minutes
15
14 collapsed lines
16
# データに含まれるタイムスタンプを取得
17
ts_str = data.get("last_updated") or data.get("timestamp")
18
if not ts_str:
19
raise ValueError(f"{source_name}: timestamp field missing")
20
21
data_ts = datetime.fromisoformat(ts_str)
22
age = (datetime.now(data_ts.tzinfo) - data_ts).total_seconds() / 60
23
24
return FreshnessResult(
25
is_fresh=age <= threshold,
26
age_minutes=age,
27
threshold_minutes=threshold,
28
data_timestamp=data_ts,
29
)

取得時に鮮度判定

1
def fetch_with_freshness_check(source_name: str) -> dict:
2
data = _fetch(source_name)
3
result = check_freshness(data, source_name)
4
5
if not result.is_fresh:
6
logger.warning(
7
f"{source_name} is stale: "
8
f"age={result.age_minutes:.0f}min > threshold={result.threshold_minutes}min"
9
)
10
raise StaleDataError(source_name, result)
11
12
return data

raise するのは強気に見えますが、stale データで下流処理を続けるよりは止めたほうが安全です。ただし止めるだけだと運用が回らないので、次に fallback を入れます。

Fallback チェーン

概念

「優先順位付きの代替ソース」を並べて、上から順に試します。

1
SOURCE_CHAIN = [
2
"primary_api", # 最優先
3
"secondary_api", # primary が stale なら
4
"cached_snapshot", # secondary も stale なら(数時間前のキャッシュ)
5
"manual_override", # 全部 stale なら(運用者が手動で置く)
6
]
7
8
def fetch_with_fallback(metric_name: str) -> tuple[dict, str]:
9
"""優先順に試して、最初に fresh だったソースを返す"""
10
last_error = None
11
for source in SOURCE_CHAIN:
12
try:
13
data = fetch_with_freshness_check(f"{source}.{metric_name}")
14
return data, source
15
except (StaleDataError, requests.RequestException) as e:
5 collapsed lines
16
logger.warning(f"{source} failed: {e}")
17
last_error = e
18
19
# 全部失敗
20
raise AllSourcesStaleError(metric_name, last_error)

Fallback が発動したら通知

「primary が stale で secondary に切り替わった」状態は 異常ではないが 要観察です。発動したら必ず Slack 通知します。

1
def fetch_with_fallback_and_notify(metric_name: str):
2
data, used_source = fetch_with_fallback(metric_name)
3
if used_source != SOURCE_CHAIN[0]:
4
send_slack(
5
f"⚠️ {metric_name}{used_source} を使用 "
6
f"(primary stale or down)"
7
)
8
return data, used_source

キャッシュ(中間スナップショット)

cached_snapshot は、直近の fresh データを定期的に保存しておく仕組みです。primary も secondary も死んだ場合の最終手段として使います。

1
SNAPSHOT_DIR = Path("/var/data/snapshots")
2
3
def save_snapshot(metric_name: str, data: dict):
4
"""fresh なデータを取得できたら snapshot として保存"""
5
path = SNAPSHOT_DIR / f"{metric_name}.json"
6
path.write_text(json.dumps({
7
"data": data,
8
"saved_at": datetime.now().isoformat(),
9
}))
10
11
def fetch_from_snapshot(metric_name: str) -> dict:
12
path = SNAPSHOT_DIR / f"{metric_name}.json"
13
if not path.exists():
14
raise FileNotFoundError
15
snapshot = json.loads(path.read_text())
5 collapsed lines
16
saved_at = datetime.fromisoformat(snapshot["saved_at"])
17
age_h = (datetime.now() - saved_at).total_seconds() / 3600
18
if age_h > 24:
19
raise StaleSnapshotError(f"snapshot is {age_h:.0f}h old")
20
return snapshot["data"]

24 時間より古い snapshot は使わない、という制限を入れます。snapshot 自体が腐っていたら意味がないので。

切替時の整合性チェック

異なるソースで値がズレる問題

primary と secondary で データの形式が微妙に違う ケースがあります。

  • 単位が違う(円 vs ドル、KB vs MB)
  • カラム名が違う(value vs metric_value
  • スキーマが違う(フラット vs ネスト)

切り替えた瞬間に下流のロジックが壊れます。正規化レイヤーを挟みます。

1
def normalize(data: dict, source: str) -> dict:
2
"""ソースごとに異なる形式を共通フォーマットに正規化"""
3
if source.startswith("primary"):
4
return {
5
"timestamp": data["last_updated"],
6
"value": data["value"],
7
}
8
elif source.startswith("secondary"):
9
return {
10
"timestamp": data["timestamp"],
11
"value": data["metric_value"] / 1000, # 単位変換
12
}
13
elif source.startswith("cached"):
14
return data # snapshot は既に正規化済み
15
else:
1 collapsed line
16
raise ValueError(f"unknown source: {source}")

値のレンジチェック

切り替え後の値が 直前の値から極端に変わっていないか 確認します。

1
def sanity_check(new_data: dict, last_known_data: dict | None) -> bool:
2
if last_known_data is None:
3
return True
4
new_v = new_data["value"]
5
last_v = last_known_data["value"]
6
if last_v == 0:
7
return new_v < 1.0 # 急に大きな値は怪しい
8
ratio = new_v / last_v
9
return 0.5 <= ratio <= 2.0 # 半分〜2 倍の範囲

範囲外なら警告を上げて、人間に判断を仰ぎます。

運用してみての気づき

効いた点

1. stale を「気づける」ようになった

事故の原因の大半が「気づかなかった」だったので、取得時に必ず鮮度判定を通すだけで効果が大きかった。

2. 一時的な障害で全停止しなくなった

primary API が 30 分メンテで死んでも、secondary に切り替わって動き続けます。「メンテ中だから止まる」のは前は当たり前でしたが、now は通知だけで動き続けます。

3. snapshot のありがたみ

primary も secondary も同時に死ぬケースは滅多にないですが、起きたときに snapshot で 1 時間程度なら持ちこたえられます。「データソースの最終的な保険」として機能しました。

罠 1: snapshot を保存し忘れる

fallback の分岐に入った後で save_snapshot を呼んでいたので、primary が一度も成功しなかったときに snapshot が更新されない。

修正: primary 成功時に必ず snapshot 保存。

1
data = fetch_with_freshness_check("primary_api")
2
save_snapshot(metric_name, data) # 取得直後に保存

罠 2: 単位ミスを見逃す

normalization のロジックを最初に書いたとき、secondary の単位変換を間違えて「primary が KB、secondary が KB → MB に誤変換」していました。値が突然 1000 倍に。

修正: sanity_check のレンジを厳しめに設定し、切替直後は 1 サイクル分は警告のみで使わないようにする。

罠 3: fallback 通知の頻度

fallback が発動するたびに通知すると、primary が長時間死んでいるケースで Slack が埋まります。

修正: 「同じ理由の通知は 1 時間に 1 回まで」にする。

まとめ

レイヤー役割
鮮度判定データの中身の timestamp で stale を検知
Fallback チェーンprimary → secondary → cached → manual の優先順位
正規化レイヤーソース間の形式差を吸収
Sanity check切替時の値の妥当性を確認
通知fallback 発動時に Slack で要観察マーク

外部データに依存するシステムで、「データソースが完璧」と仮定するのは危険です。

鮮度判定 + fallback チェーン + 正規化 のセットを組むと、データソース側の障害をシステム全体の障害にしない柔軟性が生まれます。実装はそれなりに重いですが、長期運用では負担を回収できる投資でした。

Article title:上流データの stale 検知と代替ソース自動切替:fallback チェーンで「データが古い」を運用で吸収する
Article author:45395
Release time:2026-05-08

記事へのご質問・ご感想をお聞かせください

フィードバックを送る