データ処理パイプラインで、1時間制限の長時間タスクが複数の外部APIを呼ぶ構成でした。あるAPIがハングすると、全体のタイムアウトまで他のAPI接続が閉じられず、FD・メモリ・DBコネクションが溜まり続ける問題が発生。
タイムアウトを親から子へ正しく伝播させるための設計パターンと、実装で踏んだ落とし穴をまとめます。
問題:タイムアウトは「自動的に伝播しない」
典型的なNGコード
1def run_pipeline(timeout_sec: int = 3600):2 data_a = fetch_from_api_a() # 内部でrequests.get(timeout=10)3 data_b = fetch_from_api_b() # 内部でrequests.get(timeout=10)4 data_c = heavy_computation() # CPU処理5 return merge(data_a, data_b, data_c)このコードにはtimeout_sec=3600の引数がありますが、どこにも使われていません。個別のAPIはtimeout=10で制限されていますが、全体がどれだけ時間を食ってもそれ自体は止まりません。
何が起きるか
113:00:00 pipeline開始213:00:05 fetch_api_a 完了(5秒)313:00:15 fetch_api_b 完了(10秒)413:00:16 heavy_computation 開始513:58:00 heavy_computation まだ実行中614:00:00 親プロセスの外側から SIGTERM714:00:00 計算中のデータは破棄、DBコネクションは閉じられず残る外側(launchdやsystemd)からSIGTERMされる頃には、どこまで進んだか・何を中断すべきかがわからない状態になっています。
設計原則:デッドラインを持ち回る
タイムアウトを「残り時間」ではなく**「絶対時刻(デッドライン)」**として持ち回ります。
1import time2
3def run_pipeline(deadline: float):4 data_a = fetch_from_api_a(deadline)5 data_b = fetch_from_api_b(deadline)6 data_c = heavy_computation(deadline)7 return merge(data_a, data_b, data_c)8
9# 呼び出し側10deadline = time.monotonic() + 360011run_pipeline(deadline)各関数の中でremaining = deadline - time.monotonic()を計算し、その時間だけ外部API待ちをします。
1def fetch_from_api_a(deadline: float):2 remaining = deadline - time.monotonic()3 if remaining <= 0:4 raise TimeoutError("pipeline deadline exceeded")5 timeout = min(remaining, 10) # APIごとの上限とデッドラインの小さい方6 return requests.get(URL_A, timeout=timeout).json()time.monotonic()を使うのがポイント。time.time()は時計の調整で戻ることがありますが、monotonicは単調増加です。
キャンセルシグナルの伝播
threading.Eventでキャンセルを通知
長時間のCPU処理やループには、明示的なキャンセルチェックを入れます。
1import threading2
3def heavy_computation(deadline: float, cancel: threading.Event):4 result = []5 for i in range(10_000):6 if cancel.is_set() or time.monotonic() >= deadline:7 raise TimeoutError(f"cancelled at iteration {i}")8 result.append(process_item(i))9 return result10
11# 呼び出し側12cancel = threading.Event()13try:14 run_pipeline(deadline, cancel)15except TimeoutError:2 collapsed lines
16 cancel.set() # 他のスレッドにも伝播17 cleanup()Eventを全関数に渡すと引数が膨らむので、実際はコンテキストオブジェクトにまとめます。
1from dataclasses import dataclass2
3@dataclass4class RunContext:5 deadline: float6 cancel: threading.Event7
8 def check(self):9 if self.cancel.is_set():10 raise CancelledError("cancelled by parent")11 if time.monotonic() >= self.deadline:12 raise TimeoutError("deadline exceeded")13
14 def remaining(self) -> float:15 return max(0.0, self.deadline - time.monotonic())9 collapsed lines
16
17def run_pipeline(ctx: RunContext):18 ctx.check()19 data_a = fetch_from_api_a(ctx)20 ctx.check()21 data_b = fetch_from_api_b(ctx)22 ctx.check()23 data_c = heavy_computation(ctx)24 return merge(data_a, data_b, data_c)チェックポイントを明示的に置くことで、どこで中断されたかがログに残せます。
cleanup処理の設計
タイムアウト時に途中まで開いたリソースを確実に閉じる必要があります。
パターン1: try/finallyでまとめる
1def fetch_from_api_a(ctx: RunContext):2 session = requests.Session()3 try:4 resp = session.get(URL_A, timeout=ctx.remaining())5 return resp.json()6 finally:7 session.close()finallyは例外・タイムアウト・正常終了のすべてで実行されます。
パターン2: with文とコンテキストマネージャ
1def fetch_from_api_a(ctx: RunContext):2 with requests.Session() as session:3 resp = session.get(URL_A, timeout=ctx.remaining())4 return resp.json()対応するクラスならwithのほうが意図が明確です。
パターン3: ExitStackで動的に積む
複数のリソースを条件付きで開く場合はcontextlib.ExitStackが便利です。
1from contextlib import ExitStack2
3def run_pipeline(ctx: RunContext):4 with ExitStack() as stack:5 session = stack.enter_context(requests.Session())6 db = stack.enter_context(open_db())7 cache = stack.enter_context(open_cache())8
9 ctx.check()10 data = session.get(URL).json()11 ctx.check()12 db.write(data)13 # タイムアウトでもExitStackが全リソースを逆順でclose明示的にclose()を呼ぶより、with/ExitStackのほうが事故が少ないです。
非同期版:asyncioで書く場合
asyncioには標準でタイムアウトとキャンセルの仕組みがあります。
1import asyncio2
3async def fetch(session, url, timeout):4 async with session.get(url, timeout=timeout) as resp:5 return await resp.json()6
7async def run_pipeline():8 try:9 async with asyncio.timeout(3600): # Python 3.11+10 async with aiohttp.ClientSession() as session:11 data_a = await fetch(session, URL_A, timeout=10)12 data_b = await fetch(session, URL_B, timeout=10)13 data_c = await heavy_computation_async()14 return merge(data_a, data_b, data_c)15 except asyncio.TimeoutError:2 collapsed lines
16 logger.error("pipeline timeout")17 raiseasyncio.timeout()は内部のawaitを全てキャンセルします。ただし同期的な重い処理はキャンセルできないので、CPU処理はloop.run_in_executor()で別スレッドに逃がします。
1async def heavy_computation_async():2 loop = asyncio.get_running_loop()3 return await loop.run_in_executor(None, cpu_intensive_function)実際に踏んだ落とし穴
落とし穴1: requestsのtimeoutは接続+読み取りの合計ではない
1requests.get(url, timeout=10)このtimeout=10は**「接続の確立に10秒」「読み取りの各チャンクに10秒」**で、全体の上限ではありません。巨大なレスポンスを遅くストリームすると、数分かかることがあります。
対処:
1# 明示的にタプルで指定2requests.get(url, timeout=(5, 30)) # (接続, 読み取り各チャンク)3
4# 全体上限が必要なら自前でタイマー5resp = requests.get(url, timeout=(5, 30), stream=True)6start = time.monotonic()7chunks = []8for chunk in resp.iter_content(8192):9 if time.monotonic() - start > 60:10 resp.close()11 raise TimeoutError("read too long")12 chunks.append(chunk)落とし穴2: subprocessのtimeout後はプロセスが残る
1try:2 subprocess.run(cmd, timeout=60)3except subprocess.TimeoutExpired:4 pass # プロセスが死なずに残るTimeoutExpiredが出ても、子プロセスはまだ動いています。
対処:
1proc = subprocess.Popen(cmd)2try:3 proc.wait(timeout=60)4except subprocess.TimeoutExpired:5 proc.kill()6 proc.wait() # killの完了を待つ落とし穴3: cleanupが長すぎてタイムアウトをさらに超過
1try:2 run_long_task(deadline=3600)3except TimeoutError:4 flush_buffer_to_db() # これが5分かかる5 send_slack_notification() # これが30秒かかる6 # 外側のSIGTERMが来て中途半端に死ぬ対処:cleanupにも予算を確保します。
1# メイン処理は3600秒ではなく3300秒で切り上げる2# 残り300秒をcleanupに確保3MAIN_BUDGET = 33004CLEANUP_BUDGET = 3005
6deadline_main = time.monotonic() + MAIN_BUDGET7try:8 run_long_task(deadline=deadline_main)9finally:10 cleanup_deadline = time.monotonic() + CLEANUP_BUDGET11 cleanup(cleanup_deadline)落とし穴4: デーモンスレッドは中断されない
1t = threading.Thread(target=heavy_loop, daemon=True)2t.start()3# 親プロセス終了時、daemon=Trueなら強制終了される4# → ファイル書き込み中だったら破損する可能性対処:中断可能なループに書き直す or multiprocessingで別プロセスにします。
監視:タイムアウト発生率を記録する
タイムアウトが頻発するなら、設計の問題。発生率をメトリクスで追います。
1def run_pipeline(ctx: RunContext):2 start = time.monotonic()3 try:4 result = _run_pipeline_impl(ctx)5 metrics.histogram("pipeline.duration", time.monotonic() - start)6 return result7 except TimeoutError:8 metrics.increment("pipeline.timeout")9 raiseタイムアウト数 / 総実行数が月次で1%を超えるなら、タスク分割やチェックポイント追加を検討します。
まとめ
| 設計要素 | 推奨 |
|---|---|
| 時間の表現 | 絶対デッドライン(time.monotonic()基準) |
| キャンセル通知 | threading.Event または asyncio.CancelledError |
| リソース管理 | with/ExitStack で確実にclose |
| cleanup予算 | メイン処理の10〜20%を確保 |
| CPU処理 | チェックポイント入りループ、または別プロセス化 |
| 非同期版 | asyncio.timeout()(3.11+) |
タイムアウトを「起こさない」設計ではなく、「起きたときに正しく後始末できる」設計を目指します。長時間タスクは必ずどこかで落ちる前提で、デッドラインを持ち回り、cleanup予算を確保しておきます。