45395 - シコウサクゴ -

長時間タスクのタイムアウト伝播:親から子へのキャンセル設計

2026-04-16
AI駆動開発
AI駆動開発
Python
非同期処理
タイムアウト
subprocess
運用
Last updated:2026-04-21
9 Minutes
1645 Words

データ処理パイプラインで、1時間制限の長時間タスクが複数の外部APIを呼ぶ構成でした。あるAPIがハングすると、全体のタイムアウトまで他のAPI接続が閉じられず、FD・メモリ・DBコネクションが溜まり続ける問題が発生。

タイムアウトを親から子へ正しく伝播させるための設計パターンと、実装で踏んだ落とし穴をまとめます。

問題:タイムアウトは「自動的に伝播しない」

典型的なNGコード

1
def run_pipeline(timeout_sec: int = 3600):
2
data_a = fetch_from_api_a() # 内部でrequests.get(timeout=10)
3
data_b = fetch_from_api_b() # 内部でrequests.get(timeout=10)
4
data_c = heavy_computation() # CPU処理
5
return merge(data_a, data_b, data_c)

このコードにはtimeout_sec=3600の引数がありますが、どこにも使われていません。個別のAPIはtimeout=10で制限されていますが、全体がどれだけ時間を食ってもそれ自体は止まりません。

何が起きるか

1
13:00:00 pipeline開始
2
13:00:05 fetch_api_a 完了(5秒)
3
13:00:15 fetch_api_b 完了(10秒)
4
13:00:16 heavy_computation 開始
5
13:58:00 heavy_computation まだ実行中
6
14:00:00 親プロセスの外側から SIGTERM
7
14:00:00 計算中のデータは破棄、DBコネクションは閉じられず残る

外側(launchdやsystemd)からSIGTERMされる頃には、どこまで進んだか・何を中断すべきかがわからない状態になっています。

設計原則:デッドラインを持ち回る

タイムアウトを「残り時間」ではなく**「絶対時刻(デッドライン)」**として持ち回ります。

1
import time
2
3
def run_pipeline(deadline: float):
4
data_a = fetch_from_api_a(deadline)
5
data_b = fetch_from_api_b(deadline)
6
data_c = heavy_computation(deadline)
7
return merge(data_a, data_b, data_c)
8
9
# 呼び出し側
10
deadline = time.monotonic() + 3600
11
run_pipeline(deadline)

各関数の中でremaining = deadline - time.monotonic()を計算し、その時間だけ外部API待ちをします。

1
def fetch_from_api_a(deadline: float):
2
remaining = deadline - time.monotonic()
3
if remaining <= 0:
4
raise TimeoutError("pipeline deadline exceeded")
5
timeout = min(remaining, 10) # APIごとの上限とデッドラインの小さい方
6
return requests.get(URL_A, timeout=timeout).json()

time.monotonic()を使うのがポイント。time.time()は時計の調整で戻ることがありますが、monotonicは単調増加です。

キャンセルシグナルの伝播

threading.Eventでキャンセルを通知

長時間のCPU処理やループには、明示的なキャンセルチェックを入れます。

1
import threading
2
3
def heavy_computation(deadline: float, cancel: threading.Event):
4
result = []
5
for i in range(10_000):
6
if cancel.is_set() or time.monotonic() >= deadline:
7
raise TimeoutError(f"cancelled at iteration {i}")
8
result.append(process_item(i))
9
return result
10
11
# 呼び出し側
12
cancel = threading.Event()
13
try:
14
run_pipeline(deadline, cancel)
15
except TimeoutError:
2 collapsed lines
16
cancel.set() # 他のスレッドにも伝播
17
cleanup()

Eventを全関数に渡すと引数が膨らむので、実際はコンテキストオブジェクトにまとめます。

1
from dataclasses import dataclass
2
3
@dataclass
4
class RunContext:
5
deadline: float
6
cancel: threading.Event
7
8
def check(self):
9
if self.cancel.is_set():
10
raise CancelledError("cancelled by parent")
11
if time.monotonic() >= self.deadline:
12
raise TimeoutError("deadline exceeded")
13
14
def remaining(self) -> float:
15
return max(0.0, self.deadline - time.monotonic())
9 collapsed lines
16
17
def run_pipeline(ctx: RunContext):
18
ctx.check()
19
data_a = fetch_from_api_a(ctx)
20
ctx.check()
21
data_b = fetch_from_api_b(ctx)
22
ctx.check()
23
data_c = heavy_computation(ctx)
24
return merge(data_a, data_b, data_c)

チェックポイントを明示的に置くことで、どこで中断されたかがログに残せます

cleanup処理の設計

タイムアウト時に途中まで開いたリソースを確実に閉じる必要があります。

パターン1: try/finallyでまとめる

1
def fetch_from_api_a(ctx: RunContext):
2
session = requests.Session()
3
try:
4
resp = session.get(URL_A, timeout=ctx.remaining())
5
return resp.json()
6
finally:
7
session.close()

finallyは例外・タイムアウト・正常終了のすべてで実行されます。

パターン2: with文とコンテキストマネージャ

1
def fetch_from_api_a(ctx: RunContext):
2
with requests.Session() as session:
3
resp = session.get(URL_A, timeout=ctx.remaining())
4
return resp.json()

対応するクラスならwithのほうが意図が明確です。

パターン3: ExitStackで動的に積む

複数のリソースを条件付きで開く場合はcontextlib.ExitStackが便利です。

1
from contextlib import ExitStack
2
3
def run_pipeline(ctx: RunContext):
4
with ExitStack() as stack:
5
session = stack.enter_context(requests.Session())
6
db = stack.enter_context(open_db())
7
cache = stack.enter_context(open_cache())
8
9
ctx.check()
10
data = session.get(URL).json()
11
ctx.check()
12
db.write(data)
13
# タイムアウトでもExitStackが全リソースを逆順でclose

明示的にclose()を呼ぶより、with/ExitStackのほうが事故が少ないです。

非同期版:asyncioで書く場合

asyncioには標準でタイムアウトとキャンセルの仕組みがあります。

1
import asyncio
2
3
async def fetch(session, url, timeout):
4
async with session.get(url, timeout=timeout) as resp:
5
return await resp.json()
6
7
async def run_pipeline():
8
try:
9
async with asyncio.timeout(3600): # Python 3.11+
10
async with aiohttp.ClientSession() as session:
11
data_a = await fetch(session, URL_A, timeout=10)
12
data_b = await fetch(session, URL_B, timeout=10)
13
data_c = await heavy_computation_async()
14
return merge(data_a, data_b, data_c)
15
except asyncio.TimeoutError:
2 collapsed lines
16
logger.error("pipeline timeout")
17
raise

asyncio.timeout()は内部のawaitを全てキャンセルします。ただし同期的な重い処理はキャンセルできないので、CPU処理はloop.run_in_executor()で別スレッドに逃がします。

1
async def heavy_computation_async():
2
loop = asyncio.get_running_loop()
3
return await loop.run_in_executor(None, cpu_intensive_function)

実際に踏んだ落とし穴

落とし穴1: requestsのtimeoutは接続+読み取りの合計ではない

1
requests.get(url, timeout=10)

このtimeout=10は**「接続の確立に10秒」「読み取りの各チャンクに10秒」**で、全体の上限ではありません。巨大なレスポンスを遅くストリームすると、数分かかることがあります。

対処:

1
# 明示的にタプルで指定
2
requests.get(url, timeout=(5, 30)) # (接続, 読み取り各チャンク)
3
4
# 全体上限が必要なら自前でタイマー
5
resp = requests.get(url, timeout=(5, 30), stream=True)
6
start = time.monotonic()
7
chunks = []
8
for chunk in resp.iter_content(8192):
9
if time.monotonic() - start > 60:
10
resp.close()
11
raise TimeoutError("read too long")
12
chunks.append(chunk)

落とし穴2: subprocessのtimeout後はプロセスが残る

1
try:
2
subprocess.run(cmd, timeout=60)
3
except subprocess.TimeoutExpired:
4
pass # プロセスが死なずに残る

TimeoutExpiredが出ても、子プロセスはまだ動いています。

対処:

1
proc = subprocess.Popen(cmd)
2
try:
3
proc.wait(timeout=60)
4
except subprocess.TimeoutExpired:
5
proc.kill()
6
proc.wait() # killの完了を待つ

落とし穴3: cleanupが長すぎてタイムアウトをさらに超過

1
try:
2
run_long_task(deadline=3600)
3
except TimeoutError:
4
flush_buffer_to_db() # これが5分かかる
5
send_slack_notification() # これが30秒かかる
6
# 外側のSIGTERMが来て中途半端に死ぬ

対処:cleanupにも予算を確保します。

1
# メイン処理は3600秒ではなく3300秒で切り上げる
2
# 残り300秒をcleanupに確保
3
MAIN_BUDGET = 3300
4
CLEANUP_BUDGET = 300
5
6
deadline_main = time.monotonic() + MAIN_BUDGET
7
try:
8
run_long_task(deadline=deadline_main)
9
finally:
10
cleanup_deadline = time.monotonic() + CLEANUP_BUDGET
11
cleanup(cleanup_deadline)

落とし穴4: デーモンスレッドは中断されない

1
t = threading.Thread(target=heavy_loop, daemon=True)
2
t.start()
3
# 親プロセス終了時、daemon=Trueなら強制終了される
4
# → ファイル書き込み中だったら破損する可能性

対処:中断可能なループに書き直す or multiprocessingで別プロセスにします。

監視:タイムアウト発生率を記録する

タイムアウトが頻発するなら、設計の問題。発生率をメトリクスで追います

1
def run_pipeline(ctx: RunContext):
2
start = time.monotonic()
3
try:
4
result = _run_pipeline_impl(ctx)
5
metrics.histogram("pipeline.duration", time.monotonic() - start)
6
return result
7
except TimeoutError:
8
metrics.increment("pipeline.timeout")
9
raise

タイムアウト数 / 総実行数が月次で1%を超えるなら、タスク分割やチェックポイント追加を検討します。

まとめ

設計要素推奨
時間の表現絶対デッドライン(time.monotonic()基準)
キャンセル通知threading.Event または asyncio.CancelledError
リソース管理with/ExitStack で確実にclose
cleanup予算メイン処理の10〜20%を確保
CPU処理チェックポイント入りループ、または別プロセス化
非同期版asyncio.timeout()(3.11+)

タイムアウトを「起こさない」設計ではなく、「起きたときに正しく後始末できる」設計を目指します。長時間タスクは必ずどこかで落ちる前提で、デッドラインを持ち回り、cleanup予算を確保しておきます。

Article title:長時間タスクのタイムアウト伝播:親から子へのキャンセル設計
Article author:45395
Release time:2026-04-16

記事へのご質問・ご感想をお聞かせください

フィードバックを送る