バッチ処理の多重起動防止にPIDロック(ロックファイルにプロセスIDを書き込む方式)を使っていたら、ある日ジョブが動かなくなりました。原因はstale lock——前回のプロセスが異常終了してロックファイルが残り、新しいプロセスが起動を拒否していたのです。
この問題をos.kill(pid, 0)によるプロセス生存確認で解決した実装記録です。
問題:ロックファイルが残る
PIDロックの基本
1LOCK_FILE = Path("/tmp/my_job.lock")2
3def acquire_lock() -> bool:4 if LOCK_FILE.exists():5 return False # 別プロセスが実行中6 LOCK_FILE.write_text(str(os.getpid()))7 return True8
9def release_lock():10 LOCK_FILE.unlink(missing_ok=True)正常系ではこれで十分動きます。問題は異常系です。
異常終了するとロックが残る
11. プロセスA起動 → lock.write(PID=1234)22. プロセスA処理中...33. プロセスAがSIGKILLで強制終了(またはOOM Killer)44. release_lock() が呼ばれない55. ロックファイルにPID=1234が残る66. プロセスB起動 → lock.exists() → True → 「実行中」と判断して終了77. 実際には誰も動いていないtry/finallyで対処しようとしても、SIGKILLやOOM Killerは捕捉できません。atexitも同様です。
実際に遭遇した事故
macOS launchdで管理しているデータ処理パイプラインのジョブが、メモリ不足で強制終了されました。ロックファイルが残り、以降のスケジュール実行がすべてスキップ。3日間データが更新されていなかったことに、下流の異常検知で初めて気づきました。
解決策:os.kill(pid, 0) によるプロセス生存確認
os.kill(pid, 0) とは
os.kill(pid, 0) はシグナルを送らずに、プロセスの存在だけを確認するPOSIX標準の方法です。
1import os2
3def is_process_alive(pid: int) -> bool:4 """PIDのプロセスが生存しているか確認する"""5 try:6 os.kill(pid, 0) # シグナル0 = 何もしない、存在確認のみ7 return True8 except ProcessNotFoundError:9 return False # プロセスが存在しない10 except PermissionError:11 return True # プロセスは存在する(権限不足で送信できないだけ)| 結果 | 意味 |
|---|---|
| 正常終了(例外なし) | プロセスは存在し、シグナル送信権限もある |
ProcessNotFoundError | プロセスは存在しない → stale lock |
PermissionError | プロセスは存在する(他ユーザーのプロセス) |
改良版PIDロック
1from pathlib import Path2import os3
4LOCK_FILE = Path("/tmp/my_job.lock")5
6def is_process_alive(pid: int) -> bool:7 try:8 os.kill(pid, 0)9 return True10 except ProcessNotFoundError:11 return False12 except PermissionError:13 return True # 存在はしている14
15def acquire_lock() -> bool:29 collapsed lines
16 if LOCK_FILE.exists():17 try:18 old_pid = int(LOCK_FILE.read_text().strip())19 except (ValueError, OSError):20 # ロックファイルが壊れている → stale21 LOCK_FILE.unlink(missing_ok=True)22 LOCK_FILE.write_text(str(os.getpid()))23 return True24
25 if is_process_alive(old_pid):26 return False # 本当に実行中27 else:28 # プロセスは死んでいる → stale lock29 LOCK_FILE.unlink(missing_ok=True)30 LOCK_FILE.write_text(str(os.getpid()))31 return True32
33 LOCK_FILE.write_text(str(os.getpid()))34 return True35
36def release_lock():37 # 自分のPIDの場合のみ削除(別プロセスのロックを消さない)38 if LOCK_FILE.exists():39 try:40 lock_pid = int(LOCK_FILE.read_text().strip())41 if lock_pid == os.getpid():42 LOCK_FILE.unlink()43 except (ValueError, OSError):44 passPID再利用問題への対処
PIDは再利用される
OSはPIDを再利用します。プロセスAがPID=1234で終了した後、まったく別のプロセスBがPID=1234で起動することがあります。
11. ジョブX(PID=1234)が異常終了 → ロックにPID=1234が残る22. 別のプロセス(例: Chromeの子プロセス)がPID=1234で起動33. os.kill(1234, 0) → True(生存している!)44. ジョブXは動いていないのに、ロックが解放されない対策: PIDだけでなくプロセス情報も記録
1import json2
3def acquire_lock() -> bool:4 if LOCK_FILE.exists():5 try:6 lock_info = json.loads(LOCK_FILE.read_text())7 old_pid = lock_info["pid"]8 old_name = lock_info["name"]9 except (json.JSONDecodeError, KeyError, OSError):10 LOCK_FILE.unlink(missing_ok=True)11 _write_lock()12 return True13
14 if is_process_alive(old_pid):15 # プロセスは生きているが、本当に同じジョブか?29 collapsed lines
16 try:17 import psutil18 proc = psutil.Process(old_pid)19 if proc.name() != old_name:20 # PIDが再利用されている → stale21 LOCK_FILE.unlink(missing_ok=True)22 _write_lock()23 return True24 except (psutil.NoSuchProcess, psutil.AccessDenied):25 LOCK_FILE.unlink(missing_ok=True)26 _write_lock()27 return True28 return False # 同じジョブが実行中29 else:30 LOCK_FILE.unlink(missing_ok=True)31 _write_lock()32 return True33
34 _write_lock()35 return True36
37def _write_lock():38 import psutil39 lock_info = {40 "pid": os.getpid(),41 "name": psutil.Process(os.getpid()).name(),42 "started": datetime.now().isoformat(),43 }44 LOCK_FILE.write_text(json.dumps(lock_info))psutilを使わない軽量版
外部ライブラリに依存したくない場合、/proc(Linux)やプロセス起動時刻で判定する方法もあります。
1def get_process_start_time(pid: int) -> float | None:2 """プロセスの起動時刻を取得(macOS/Linux)"""3 try:4 import subprocess5 result = subprocess.run(6 ["ps", "-o", "lstart=", "-p", str(pid)],7 capture_output=True, text=True8 )9 if result.returncode == 0 and result.stdout.strip():10 from email.utils import parsedate_to_datetime11 return parsedate_to_datetime(result.stdout.strip()).timestamp()12 except Exception:13 pass14 return Noneロックファイルに起動時刻も記録し、現在のPIDの起動時刻と一致しなければstaleと判定します。
コンテキストマネージャー化
実際の運用では、コンテキストマネージャーにして使いやすくします。
1from contextlib import contextmanager2
3@contextmanager4def job_lock(name: str):5 lock_path = Path(f"/tmp/{name}.lock")6 # acquire_lockのロジック(上記参照)7 acquired = acquire_lock_for(lock_path)8 if not acquired:9 raise RuntimeError(f"ジョブ '{name}' は既に実行中です")10 try:11 yield12 finally:13 release_lock_for(lock_path)14
15# 使い方5 collapsed lines
16def main():17 with job_lock("daily_fetch"):18 # ここで処理19 fetch_data()20 process_data()実践で学んだこと
1. stale lock検出のログは必ず残す
1if not is_process_alive(old_pid):2 logger.warning(3 f"Stale lock検出: PID={old_pid} は存在しない。ロックを解放します"4 )stale lockの発生頻度は、システムの健全性の指標になります。頻発するなら、そもそもの異常終了を調査すべきです。
2. ロックファイルのパスを統一する
1/tmp/myapp/locks/job_name.lock散らばると管理できません。ディレクトリを決めて統一します。
3. 監視ツールにstale lock検出を組み込む
定期的にロックファイルを走査し、stale lockを自動検出・自動解放する仕組みがあると安心です。ただし自動解放は誤判定のリスクがあるので、通知→手動確認→解放のフローが安全です。
まとめ
| 方式 | 多重起動防止 | stale lock対応 | PID再利用対応 |
|---|---|---|---|
| 単純なロックファイル | ○ | × | × |
| PID記録 + os.kill(pid,0) | ○ | ○ | × |
| PID + プロセス名/起動時刻 | ○ | ○ | ○ |
os.kill(pid, 0) はPOSIX標準で、追加ライブラリ不要です。まずはこれだけで十分なケースが多いです。PID再利用が問題になる環境では、プロセス名や起動時刻による追加検証を加えます。
ロックの仕組みは「正常系」だけで設計すると必ず破綻します。「プロセスが突然死んだらどうなるか?」を常に問いながら設計することが大切です。