Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OutStream hooks not being called #1367

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import atexit
import contextvars
import functools
import io
import os
import sys
Expand Down Expand Up @@ -620,10 +621,19 @@ def flush(self):
else:
self._flush()

@property
def _flush(self):
"""Prepare _flush_impl partial to be scheduled on the IO thread.

This indirection is necessary to ensure _flush_impl calls hooks
registered from the current thread (as they are thread-local).
"""
return functools.partial(self._flush_impl, self._hooks)

def _flush_impl(self, hooks=()):
"""This is where the actual send happens.

_flush should generally be called in the IO thread,
_flush_impl should generally be called in the IO thread,
unless the thread has been destroyed (e.g. forked subprocess).
"""
self._flush_pending = False
Expand All @@ -648,7 +658,7 @@ def _flush(self):
# Each transform either returns a new
# message or None. If None is returned,
# the message has been 'used' and we return.
for hook in self._hooks:
for hook in hooks:
msg = hook(msg)
if msg is None:
return
Expand Down
19 changes: 19 additions & 0 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,25 @@ async def test_outstream(anyio_backend, iopub_thread):
assert stream.writable()


async def test_outstream_hooks(anyio_backend, iopub_thread):
session = Session()

stream = OutStream(session, iopub_thread, "stdout")

with stream:
hook_called = False

def hook(msg):
nonlocal hook_called
hook_called = True
return msg

stream.register_hook(hook)
stream.write("hi")
stream.flush()
assert hook_called


@pytest.mark.anyio()
async def test_event_pipe_gc(iopub_thread):
session = Session(key=b"abc")
Expand Down
Loading