-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprocmgmt.py
160 lines (133 loc) · 4.97 KB
/
procmgmt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import subprocess as subp
import time
import thread
import errno
from common_util import Error, Babble
###########################
# Process Management #
###########################
class SpawnLockedException(Exception):
pass
class ProcessManager:
"""Class for keeping track of child processes. All processes spawned from
this class will have their pids logged. This is used to ensure that all
child processes get properly terminated when the daemon recieves a SIGTERM
signal.
A process "start lock" is provided so that when the daemon recieves the
SIGTERM, it can safely prevent new child processes from spawning."""
def __init__(self):
self._threadlock = thread.allocate_lock()
self._pids = set()
self._startlock = False
def Popen(self, *args, **kwargs):
if self._startlock:
sys.stdout.flush()
raise SpawnLockedException("process spawning is locked")
else:
p = subp.Popen(*args, **kwargs)
self.addProcess(p.pid)
return PopenWrapper(p, self)
def call(self, args):
try:
with self.Popen(args,
stdout=subp.PIPE, stderr=subp.PIPE) as pipe:
sout, serr = pipe.communicate()
Babble("%s output:\n%s\n%s\n" % (args[0], sout, serr))
return pipe.returncode, sout, serr
except OSError, err:
if err.errno == errno.ENOENT:
Error("%s could not be found. Is it installed?" % args[0])
raise
def addProcess(self, pid):
with self._threadlock:
self._pids.add(pid)
def releaseProcess(self, pid):
with self._threadlock:
self._pids.remove(pid)
def lockProcessStart(self):
"""Prevent any new processes from being launched."""
with self._threadlock:
self._startlock = True
def unlockProcessStart(self):
with self._threadlock:
self._startlock = False
def getActivePIDs(self):
pidsCopy = None
with self._threadlock:
pidsCopy = set(self._pids)
return pidsCopy
class PopenWrapper:
"""Wrapper for a subprocess.Popen object that keeps track of when the
process finishes, and reports this to its parent ProcessManager."""
def __init__(self, pipe, procman, killtimeout=1.0):
self._pipe = pipe
self._procman = procman
self._timeout = killtimeout
self._released = False
self.returncode = None
self.stdout = pipe.stdout
self.stdin = pipe.stdin
self.stder = pipe.stderr
self.pid = pipe.pid
def poll(self):
ret = self._pipe.poll()
if ret is not None:
#process has finished. log it as done.
self.release()
return ret
def wait(self):
ret = self._pipe.wait()
self.release()
return ret
def communicate(self, input=None):
out = self._pipe.communicate(input)
self.release()
return out
def release(self):
"""Owned process is finished; report it so."""
pid = self._pipe.pid
self.returncode = self._pipe.returncode
if not self._released:
self._procman.releaseProcess(pid)
self._released = True
def __enter__(self):
return self
def __exit__(self, errtype, errval, tracebk):
"""Ensure that our child process is no longer running upon return"""
pid = self._pipe.pid
self.returncode = self._pipe.returncode
if self._pipe.poll() is not None and not self._released:
# process finished, but we haven't checked it until now.
self.release()
elif errtype is not None and not self._released:
# inner code has thrown an exception.
# terminate the child process now.
try:
self._pipe.terminate()
except OSError, err:
if err.errno == errno.ESRCH:
# process already dead
pass
else:
raise
# wait for child to finish cleaning up
t_kill = time.time() + self._timeout
while time.time() < t_kill and self._pipe.poll() is None:
time.sleep(0.05)
# out of time. kill it
if self.poll() is None:
try:
self._pipe.kill()
except OSError, err:
if err.errno == errno.ESRCH:
# process already dead
pass
else:
raise
self.release()
else:
# healthy exit of `with` stmt; but process still running.
# wait for it to finish of its own accord.
self._pipe.wait()
self.release()
DFT_MGR = ProcessManager()