Skip to content
This repository has been archived by the owner on Apr 12, 2018. It is now read-only.

Commit

Permalink
Update process #6: make Process.on_sigchld() depend on Process.join()
Browse files Browse the repository at this point in the history
Process.join() now disables Process.on_sigchld() by restablishing the
default handler SIG_DFL. Process.on_sigchld() calls Process.join() to
avoid calling os.waitpid() in two difference places because it could
interrupted and then resumed in the wrong place.
  • Loading branch information
Greg Leclercq authored and ggreg committed Mar 21, 2014
1 parent c71b31d commit 86a7719
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 8 deletions.
16 changes: 8 additions & 8 deletions pkit/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,7 @@ def __repr__(self):

def on_sigchld(self, signum, sigframe):
if self._child is not None and self._child.pid:
pid, status = os.waitpid(self._child.pid, os.WNOHANG)
if pid == self._child.pid:
self._exitcode = os.WEXITSTATUS(status)

if self._on_exit:
self._on_exit(self)

self.clean()
self.join()

def create(self):
"""Method to be called when the process child is forked"""
Expand Down Expand Up @@ -320,6 +313,8 @@ def join(self, timeout=None):
:param timeout: Time to wait for the process exit
:type timeout: float
"""
signal.signal(signal.SIGCHLD, signal.SIG_DFL)

if self._child is None:
raise RuntimeError("Can only join a started process")

Expand All @@ -328,6 +323,11 @@ def join(self, timeout=None):
except OSError:
pass

if self._on_exit:
self._on_exit(self)

self.clean()

def terminate(self, wait=False):
"""Forces the process to stop
Expand Down
16 changes: 16 additions & 0 deletions tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,19 @@ def test_restart_raises_with_invalid_policy(self):
process.restart("that's definetly invalid")

_collect_process(process)

def test_process_on_exit_is_called(self):
def acquire():
sem.acquire()

def release(process):
sem.release()

from multiprocessing import Semaphore
sem = Semaphore(1)

process = Process(target=acquire,
on_exit=release)
process.start(wait=True)
process.join()
assert sem.get_value() == 1

0 comments on commit 86a7719

Please sign in to comment.