45395 - シコウサクゴ -

PIDロックのstale検出:os.kill(pid, 0)でゾンビを見分ける

2026-04-12
AI駆動開発
AI駆動開発
Python
PID
ロックファイル
バッチ処理
運用
Last updated:2026-04-12
8 Minutes
1454 Words

バッチ処理の多重起動防止にPIDロック(ロックファイルにプロセスIDを書き込む方式)を使っていたら、ある日ジョブが動かなくなりました。原因はstale lock——前回のプロセスが異常終了してロックファイルが残り、新しいプロセスが起動を拒否していたのです。

この問題をos.kill(pid, 0)によるプロセス生存確認で解決した実装記録です。

問題:ロックファイルが残る

PIDロックの基本

1
LOCK_FILE = Path("/tmp/my_job.lock")
2
3
def acquire_lock() -> bool:
4
if LOCK_FILE.exists():
5
return False # 別プロセスが実行中
6
LOCK_FILE.write_text(str(os.getpid()))
7
return True
8
9
def release_lock():
10
LOCK_FILE.unlink(missing_ok=True)

正常系ではこれで十分動きます。問題は異常系です。

異常終了するとロックが残る

1
1. プロセスA起動 → lock.write(PID=1234)
2
2. プロセスA処理中...
3
3. プロセスAがSIGKILLで強制終了(またはOOM Killer)
4
4. release_lock() が呼ばれない
5
5. ロックファイルにPID=1234が残る
6
6. プロセスB起動 → lock.exists() → True → 「実行中」と判断して終了
7
7. 実際には誰も動いていない

try/finallyで対処しようとしても、SIGKILLやOOM Killerは捕捉できません。atexitも同様です。

実際に遭遇した事故

macOS launchdで管理しているデータ処理パイプラインのジョブが、メモリ不足で強制終了されました。ロックファイルが残り、以降のスケジュール実行がすべてスキップ。3日間データが更新されていなかったことに、下流の異常検知で初めて気づきました。

解決策:os.kill(pid, 0) によるプロセス生存確認

os.kill(pid, 0) とは

os.kill(pid, 0) はシグナルを送らずに、プロセスの存在だけを確認するPOSIX標準の方法です。

1
import os
2
3
def is_process_alive(pid: int) -> bool:
4
"""PIDのプロセスが生存しているか確認する"""
5
try:
6
os.kill(pid, 0) # シグナル0 = 何もしない、存在確認のみ
7
return True
8
except ProcessNotFoundError:
9
return False # プロセスが存在しない
10
except PermissionError:
11
return True # プロセスは存在する(権限不足で送信できないだけ)
結果意味
正常終了(例外なし)プロセスは存在し、シグナル送信権限もある
ProcessNotFoundErrorプロセスは存在しない → stale lock
PermissionErrorプロセスは存在する(他ユーザーのプロセス)

改良版PIDロック

1
from pathlib import Path
2
import os
3
4
LOCK_FILE = Path("/tmp/my_job.lock")
5
6
def is_process_alive(pid: int) -> bool:
7
try:
8
os.kill(pid, 0)
9
return True
10
except ProcessNotFoundError:
11
return False
12
except PermissionError:
13
return True # 存在はしている
14
15
def acquire_lock() -> bool:
29 collapsed lines
16
if LOCK_FILE.exists():
17
try:
18
old_pid = int(LOCK_FILE.read_text().strip())
19
except (ValueError, OSError):
20
# ロックファイルが壊れている → stale
21
LOCK_FILE.unlink(missing_ok=True)
22
LOCK_FILE.write_text(str(os.getpid()))
23
return True
24
25
if is_process_alive(old_pid):
26
return False # 本当に実行中
27
else:
28
# プロセスは死んでいる → stale lock
29
LOCK_FILE.unlink(missing_ok=True)
30
LOCK_FILE.write_text(str(os.getpid()))
31
return True
32
33
LOCK_FILE.write_text(str(os.getpid()))
34
return True
35
36
def release_lock():
37
# 自分のPIDの場合のみ削除(別プロセスのロックを消さない)
38
if LOCK_FILE.exists():
39
try:
40
lock_pid = int(LOCK_FILE.read_text().strip())
41
if lock_pid == os.getpid():
42
LOCK_FILE.unlink()
43
except (ValueError, OSError):
44
pass

PID再利用問題への対処

PIDは再利用される

OSはPIDを再利用します。プロセスAがPID=1234で終了した後、まったく別のプロセスBがPID=1234で起動することがあります。

1
1. ジョブX(PID=1234)が異常終了 → ロックにPID=1234が残る
2
2. 別のプロセス(例: Chromeの子プロセス)がPID=1234で起動
3
3. os.kill(1234, 0) → True(生存している!)
4
4. ジョブXは動いていないのに、ロックが解放されない

対策: PIDだけでなくプロセス情報も記録

1
import json
2
3
def acquire_lock() -> bool:
4
if LOCK_FILE.exists():
5
try:
6
lock_info = json.loads(LOCK_FILE.read_text())
7
old_pid = lock_info["pid"]
8
old_name = lock_info["name"]
9
except (json.JSONDecodeError, KeyError, OSError):
10
LOCK_FILE.unlink(missing_ok=True)
11
_write_lock()
12
return True
13
14
if is_process_alive(old_pid):
15
# プロセスは生きているが、本当に同じジョブか?
29 collapsed lines
16
try:
17
import psutil
18
proc = psutil.Process(old_pid)
19
if proc.name() != old_name:
20
# PIDが再利用されている → stale
21
LOCK_FILE.unlink(missing_ok=True)
22
_write_lock()
23
return True
24
except (psutil.NoSuchProcess, psutil.AccessDenied):
25
LOCK_FILE.unlink(missing_ok=True)
26
_write_lock()
27
return True
28
return False # 同じジョブが実行中
29
else:
30
LOCK_FILE.unlink(missing_ok=True)
31
_write_lock()
32
return True
33
34
_write_lock()
35
return True
36
37
def _write_lock():
38
import psutil
39
lock_info = {
40
"pid": os.getpid(),
41
"name": psutil.Process(os.getpid()).name(),
42
"started": datetime.now().isoformat(),
43
}
44
LOCK_FILE.write_text(json.dumps(lock_info))

psutilを使わない軽量版

外部ライブラリに依存したくない場合、/proc(Linux)やプロセス起動時刻で判定する方法もあります。

1
def get_process_start_time(pid: int) -> float | None:
2
"""プロセスの起動時刻を取得(macOS/Linux)"""
3
try:
4
import subprocess
5
result = subprocess.run(
6
["ps", "-o", "lstart=", "-p", str(pid)],
7
capture_output=True, text=True
8
)
9
if result.returncode == 0 and result.stdout.strip():
10
from email.utils import parsedate_to_datetime
11
return parsedate_to_datetime(result.stdout.strip()).timestamp()
12
except Exception:
13
pass
14
return None

ロックファイルに起動時刻も記録し、現在のPIDの起動時刻と一致しなければstaleと判定します。

コンテキストマネージャー化

実際の運用では、コンテキストマネージャーにして使いやすくします。

1
from contextlib import contextmanager
2
3
@contextmanager
4
def job_lock(name: str):
5
lock_path = Path(f"/tmp/{name}.lock")
6
# acquire_lockのロジック(上記参照)
7
acquired = acquire_lock_for(lock_path)
8
if not acquired:
9
raise RuntimeError(f"ジョブ '{name}' は既に実行中です")
10
try:
11
yield
12
finally:
13
release_lock_for(lock_path)
14
15
# 使い方
5 collapsed lines
16
def main():
17
with job_lock("daily_fetch"):
18
# ここで処理
19
fetch_data()
20
process_data()

実践で学んだこと

1. stale lock検出のログは必ず残す

1
if not is_process_alive(old_pid):
2
logger.warning(
3
f"Stale lock検出: PID={old_pid} は存在しない。ロックを解放します"
4
)

stale lockの発生頻度は、システムの健全性の指標になります。頻発するなら、そもそもの異常終了を調査すべきです。

2. ロックファイルのパスを統一する

1
/tmp/myapp/locks/job_name.lock

散らばると管理できません。ディレクトリを決めて統一します。

3. 監視ツールにstale lock検出を組み込む

定期的にロックファイルを走査し、stale lockを自動検出・自動解放する仕組みがあると安心です。ただし自動解放は誤判定のリスクがあるので、通知→手動確認→解放のフローが安全です。

まとめ

方式多重起動防止stale lock対応PID再利用対応
単純なロックファイル××
PID記録 + os.kill(pid,0)×
PID + プロセス名/起動時刻

os.kill(pid, 0) はPOSIX標準で、追加ライブラリ不要です。まずはこれだけで十分なケースが多いです。PID再利用が問題になる環境では、プロセス名や起動時刻による追加検証を加えます。

ロックの仕組みは「正常系」だけで設計すると必ず破綻します。「プロセスが突然死んだらどうなるか?」を常に問いながら設計することが大切です。

Article title:PIDロックのstale検出:os.kill(pid, 0)でゾンビを見分ける
Article author:45395
Release time:2026-04-12

記事へのご質問・ご感想をお聞かせください

フィードバックを送る