Skip to content

Commit

Permalink
fix: poll for shell and iopub socket at the same time to avoid hittin…
Browse files Browse the repository at this point in the history
…g high water mark
  • Loading branch information
maartenbreddels committed Feb 5, 2020
1 parent 5fed372 commit 17d4798
Showing 1 changed file with 56 additions and 0 deletions.
56 changes: 56 additions & 0 deletions voila/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@
#############################################################################
import collections
import logging
try:
from time import monotonic # Py 3
except ImportError:
from time import time as monotonic # Py 2

from nbconvert.preprocessors import ClearOutputPreprocessor
from nbconvert.preprocessors.execute import CellExecutionError, ExecutePreprocessor
from nbformat.v4 import output_from_msg
import zmq

from ipykernel.jsonutil import json_clean

Expand Down Expand Up @@ -226,6 +231,57 @@ def clear_output(self, outs, msg, cell_index):
return
super(VoilaExecutePreprocessor, self).clear_output(outs, msg, cell_index)

def run_cell(self, cell, cell_index=0, store_history=False):
parent_msg_id = self.kc.execute(cell.source, store_history=store_history, stop_on_error=not self.allow_errors)
self.log.debug("Executing cell:\n%s", cell.source)
exec_timeout = self._get_timeout(cell)
deadline = None
if exec_timeout is not None:
deadline = monotonic() + exec_timeout

cell.outputs = []
self.clear_before_next_output = False

# we need to have a reply, and return to idle before we can consider the cell executed
idle = False
execute_reply = None
while not idle or execute_reply is None:
# we want to timeout regularly, to see if the kernel is still alive
# this is tested in preprocessors/test/test_execute.py#test_kernel_death
# this actually fakes the kernel death, and we might be able to use the xlist
# to detect a disconnected kernel
timeout = min(1, deadline - monotonic())
# if we interrupt on timeout, we allow 1 seconds to pass till deadline
# to make sure we get the interrupt message
if timeout >= (-1 if self.interrupt_on_timeout else 0):
# we include 0, which simply is a poll to see if we have messages left
rlist, wlist, xlist = zmq.select([self.kc.iopub_channel.socket, self.kc.shell_channel.socket], [], [], min(0, timeout))
# print("lists", rlist, wlist, xlist)
if not rlist and not wlist and not xlist:
self._check_alive()
if monotonic() > deadline:
self._handle_timeout(exec_timeout, cell)
if xlist:
raise RuntimeError("Oops, unexpected rror")
if self.kc.shell_channel.socket in rlist:
msg = self.kc.shell_channel.get_msg(block=False)
if msg['parent_header'].get('msg_id') == parent_msg_id:
execute_reply = msg
if self.kc.iopub_channel.socket in rlist:
msg = self.kc.iopub_channel.get_msg(block=False)
if msg['parent_header'].get('msg_id') == parent_msg_id:
if msg['msg_type'] == 'status' and msg['content']['execution_state'] == 'idle':
idle = True
else:
self.process_message(msg, cell, cell_index)
else:
self.log.debug("Received message for which we were not the parent: %s", msg)
else:
self._handle_timeout(exec_timeout, cell)
break

return execute_reply, cell.outputs


def executenb(nb, cwd=None, km=None, **kwargs):
resources = {}
Expand Down

0 comments on commit 17d4798

Please sign in to comment.