Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use zmq-anyio #1291

Open
wants to merge 87 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
3c14166
Use zmq-anyio
davidbrochart Nov 7, 2024
b3c1fcf
Replace thread add_task with start_soon
davidbrochart Nov 15, 2024
d9ae3fc
Replace _IOPubThread with BaseThread
davidbrochart Nov 15, 2024
093d959
Fix tests
davidbrochart Nov 15, 2024
b789ab6
Allow testing trio
davidbrochart Nov 15, 2024
18a8467
Remove pytest-asyncio from test dependencies
davidbrochart Nov 20, 2024
2d83916
Use selector thread from anyio
davidbrochart Nov 20, 2024
19ebe89
Test more Python versions on Windows
davidbrochart Nov 20, 2024
5bd6f36
Use anyio's alternate selector thread
davidbrochart Dec 1, 2024
d91abc7
-
davidbrochart Dec 1, 2024
ed4b682
-
davidbrochart Dec 1, 2024
4b73bc1
-
davidbrochart Dec 5, 2024
333f71c
-
davidbrochart Dec 17, 2024
fe2be51
-
davidbrochart Dec 17, 2024
e125d6b
-
davidbrochart Dec 17, 2024
1a3f0f6
-
davidbrochart Dec 17, 2024
30ee9f5
-
davidbrochart Dec 17, 2024
7137e7a
Enable tracemalloc
davidbrochart Dec 17, 2024
7b4abc2
Update .github/workflows/ci.yml
Carreau Dec 17, 2024
142e033
Update .github/workflows/ci.yml
Carreau Dec 17, 2024
529681a
-
davidbrochart Dec 20, 2024
0f8725b
-
davidbrochart Dec 20, 2024
7d42069
-
davidbrochart Dec 20, 2024
6d404fd
-
davidbrochart Dec 20, 2024
87afdcc
-
davidbrochart Dec 20, 2024
b5d7542
-
davidbrochart Feb 10, 2025
3a14e83
-
davidbrochart Feb 10, 2025
eefa576
Use zmq-anyio v0.3.0
davidbrochart Feb 11, 2025
b555892
Workwaround for not suspending AnyIO socket selector thread
davidbrochart Feb 12, 2025
c217f9d
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 12, 2025
871da2b
Update .github/workflows/ci.yml
davidbrochart Feb 12, 2025
70dd107
Update .github/workflows/ci.yml
Carreau Feb 12, 2025
5f5fb47
Fix iopub_thread fixture
davidbrochart Feb 13, 2025
9ab982f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 13, 2025
92975d9
-
davidbrochart Feb 13, 2025
0919954
-
davidbrochart Feb 13, 2025
8b57cd1
-
davidbrochart Feb 13, 2025
1b781fb
Merge branch 'main' into zmq_anyio
davidbrochart Feb 13, 2025
55e0f31
Remove timeout
davidbrochart Feb 13, 2025
3c3bc57
Fix test_embed_kernel
davidbrochart Feb 13, 2025
1a9b5f2
Merge branch 'main' into zmq_anyio
davidbrochart Feb 13, 2025
1b1ec80
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 13, 2025
cefa3e9
Fix test_start_kernel
davidbrochart Feb 13, 2025
c3c6418
Merge branch 'main' into zmq_anyio
davidbrochart Feb 14, 2025
b7d7144
Merge branch 'main' into zmq_anyio
davidbrochart Feb 14, 2025
a6de84e
Fix test_asyncio_loop
davidbrochart Feb 17, 2025
e0e0bf6
Merge branch 'main' into zmq_anyio
davidbrochart Feb 20, 2025
df3f084
Merge branch 'main' into zmq_anyio
davidbrochart Feb 20, 2025
076d3da
Run test_asyncio_loop on asyncio backend only
davidbrochart Feb 20, 2025
7324c05
Fix typing
davidbrochart Feb 20, 2025
b9b4803
Fix pypy
davidbrochart Feb 20, 2025
7f5250a
Reset CI to main
davidbrochart Feb 20, 2025
1d2f6be
Add back Windows runs in CI matrix
davidbrochart Feb 20, 2025
fd3aa85
-
davidbrochart Feb 20, 2025
bf87ed8
-
davidbrochart Feb 20, 2025
a573494
Merge branch 'main' into zmq_anyio
davidbrochart Feb 20, 2025
a9e9e47
Review
davidbrochart Feb 20, 2025
c174f1b
Don't test autoawait on trio
davidbrochart Feb 20, 2025
192d62e
Add back pytest.mark.anyio
davidbrochart Feb 20, 2025
f13f7d7
Merge branch 'main' into zmq_anyio
davidbrochart Feb 20, 2025
c9e9108
Set async event in async context
davidbrochart Feb 21, 2025
c458479
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 21, 2025
e7706c3
Remove trio_runner
davidbrochart Feb 21, 2025
75c3fad
Merge remote-tracking branch 'origin/zmq_anyio' into zmq_anyio
davidbrochart Feb 21, 2025
255cd27
Remove trio_runner from docs
davidbrochart Feb 21, 2025
4327f17
Merge branch 'main' into zmq_anyio
Carreau Feb 23, 2025
4f83ff3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 23, 2025
15bb2fa
Merge branch 'main' into zmq_anyio
davidbrochart Feb 25, 2025
cf8144a
Install ipython main
davidbrochart Feb 25, 2025
2bf0c6f
Invoke garbage collector, drop python 3.9 and 3.10
davidbrochart Feb 25, 2025
1b1a0a1
Don't test on pypy, add more gc.collect()
davidbrochart Feb 25, 2025
e3ad3e1
Limit IPython HistoryManager instance number to 1
davidbrochart Feb 25, 2025
c4cb536
Move garbage collection to session fixture
davidbrochart Feb 25, 2025
3f89427
fix leak
Carreau Feb 25, 2025
6bea48d
fix 2 more leaks
Carreau Feb 25, 2025
38405df
try to fix at_exit_once
Carreau Feb 25, 2025
024ffb0
Move watching shell_stop event to start()
davidbrochart Feb 25, 2025
a7920e0
Skip test_comm on Trio
davidbrochart Feb 25, 2025
0c7e88e
Skip test_eventloop on Trio
davidbrochart Feb 25, 2025
f21dd29
Remove ipython install from github
davidbrochart Feb 25, 2025
acb0fad
-
davidbrochart Feb 25, 2025
2c08818
Merge branch 'main' into zmq_anyio
davidbrochart Feb 26, 2025
21fd27a
-
davidbrochart Feb 26, 2025
6853f39
Merge branch 'main' into zmq_anyio
davidbrochart Feb 27, 2025
bc75196
-
davidbrochart Feb 27, 2025
1dba7da
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 27, 2025
73ae49d
-
davidbrochart Feb 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions docs/api/ipykernel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,6 @@ Submodules
:show-inheritance:


.. automodule:: ipykernel.trio_runner
:members:
:undoc-members:
:show-inheritance:


.. automodule:: ipykernel.zmqshell
:members:
:undoc-members:
Expand Down
4 changes: 2 additions & 2 deletions ipykernel/debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ async def _send_request(self, msg):
self.log.debug("DEBUGPYCLIENT:")
self.log.debug(self.routing_id)
self.log.debug(buf)
await self.debugpy_socket.send_multipart((self.routing_id, buf))
await self.debugpy_socket.asend_multipart((self.routing_id, buf)).wait()

async def _wait_for_response(self):
# Since events are never pushed to the message_queue
Expand Down Expand Up @@ -438,7 +438,7 @@ async def start(self):
(self.shell_socket.getsockopt(ROUTING_ID)),
)

msg = await self.shell_socket.recv_multipart()
msg = await self.shell_socket.arecv_multipart().wait()
ident, msg = self.session.feed_identities(msg, copy=True)
try:
msg = self.session.deserialize(msg, content=True, copy=True)
Expand Down
12 changes: 8 additions & 4 deletions ipykernel/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,17 @@ def _bind_socket(self):
def run(self):
"""Run the heartbeat thread."""
self.name = "Heartbeat"
self.socket = self.context.socket(zmq.ROUTER)
self.socket.linger = 1000

try:
self.socket = self.context.socket(zmq.ROUTER)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be a real dumb question, but could/should this be refactored (in another PR maybe) into a context manager:

with self.context.socket(zmq.ROUTER) as self.socket:
    ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, I was just trying to get rid of a failure in test_merge_connection_file, which creates an IPKernelApp, initializes its heartbeat socket and thread, and closes the ZMQ context, leaving the thread working on a closed ZMQ context 😢
It was just working by chance before.

self.socket.linger = 1000
self._bind_socket()
except Exception:
self.socket.close()
raise
try:
self.socket.close()
except Exception:
pass
return

while True:
try:
Expand Down
2 changes: 1 addition & 1 deletion ipykernel/inprocess/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class InProcessKernel(IPythonKernel):
_underlying_iopub_socket = Instance(DummySocket, (False,))
iopub_thread: IOPubThread = Instance(IOPubThread) # type:ignore[assignment]

shell_socket = Instance(DummySocket, (True,)) # type:ignore[arg-type]
shell_socket = Instance(DummySocket, (True,))

@default("iopub_thread")
def _default_iopub_thread(self):
Expand Down
2 changes: 1 addition & 1 deletion ipykernel/inprocess/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ async def recv( # type: ignore[override]
mode, content, copy have no effect, but are present for superclass compatibility

"""
return await socket.recv_multipart()
return await socket.arecv_multipart().wait()

def send(
self,
Expand Down
6 changes: 5 additions & 1 deletion ipykernel/inprocess/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,8 @@ async def poll(self, timeout=0):
return statistics.current_buffer_used != 0

def close(self):
pass
if self.is_shell:
self.in_send_stream.close()
self.in_receive_stream.close()
self.out_send_stream.close()
self.out_receive_stream.close()
72 changes: 35 additions & 37 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Any, Callable

import zmq
import zmq_anyio
from anyio import sleep
from jupyter_client.session import extract_header

Expand Down Expand Up @@ -48,7 +49,7 @@ class IOPubThread:
whose IO is always run in a thread.
"""

def __init__(self, socket, pipe=False):
def __init__(self, socket: zmq_anyio.Socket, pipe: bool = False):
"""Create IOPub thread

Parameters
Expand All @@ -61,10 +62,7 @@ def __init__(self, socket, pipe=False):
"""
# ensure all of our sockets as sync zmq.Sockets
# don't create async wrappers until we are within the appropriate coroutines
self.socket: zmq.Socket[bytes] | None = zmq.Socket(socket)
if self.socket.context is None:
# bug in pyzmq, shadow socket doesn't always inherit context attribute
self.socket.context = socket.context # type:ignore[unreachable]
self.socket: zmq_anyio.Socket = socket
self._context = socket.context

self.background_socket = BackgroundSocket(self)
Expand All @@ -78,7 +76,7 @@ def __init__(self, socket, pipe=False):
self._event_pipe_gc_lock: threading.Lock = threading.Lock()
self._event_pipe_gc_seconds: float = 10
self._setup_event_pipe()
tasks = [self._handle_event, self._run_event_pipe_gc]
tasks = [self._handle_event, self._run_event_pipe_gc, self.socket.start]
if pipe:
tasks.append(self._handle_pipe_msgs)
self.thread = BaseThread(name="IOPub", daemon=True)
Expand All @@ -87,7 +85,7 @@ def __init__(self, socket, pipe=False):

def _setup_event_pipe(self):
"""Create the PULL socket listening for events that should fire in this thread."""
self._pipe_in0 = self._context.socket(zmq.PULL, socket_class=zmq.Socket)
self._pipe_in0 = self._context.socket(zmq.PULL)
self._pipe_in0.linger = 0

_uuid = b2a_hex(os.urandom(16)).decode("ascii")
Expand Down Expand Up @@ -122,7 +120,7 @@ def _event_pipe(self):
except AttributeError:
# new thread, new event pipe
# create sync base socket
event_pipe = self._context.socket(zmq.PUSH, socket_class=zmq.Socket)
event_pipe = self._context.socket(zmq.PUSH)
event_pipe.linger = 0
event_pipe.connect(self._event_interface)
self._local.event_pipe = event_pipe
Expand All @@ -141,30 +139,28 @@ async def _handle_event(self):
Whenever *an* event arrives on the event stream,
*all* waiting events are processed in order.
"""
# create async wrapper within coroutine
pipe_in = zmq.asyncio.Socket(self._pipe_in0)
try:
while True:
await pipe_in.recv()
# freeze event count so new writes don't extend the queue
# while we are processing
n_events = len(self._events)
for _ in range(n_events):
event_f = self._events.popleft()
event_f()
except Exception:
if self.thread.stopped.is_set():
return
raise
pipe_in = zmq_anyio.Socket(self._pipe_in0)
async with pipe_in:
try:
while True:
await pipe_in.arecv().wait()
# freeze event count so new writes don't extend the queue
# while we are processing
n_events = len(self._events)
for _ in range(n_events):
event_f = self._events.popleft()
event_f()
except Exception:
if self.thread.stopped.is_set():
return
raise

def _setup_pipe_in(self):
"""setup listening pipe for IOPub from forked subprocesses"""
ctx = self._context

# use UUID to authenticate pipe messages
self._pipe_uuid = os.urandom(16)

self._pipe_in1 = ctx.socket(zmq.PULL, socket_class=zmq.Socket)
self._pipe_in1 = zmq_anyio.Socket(self._context.socket(zmq.PULL))
self._pipe_in1.linger = 0

try:
Expand All @@ -181,19 +177,18 @@ def _setup_pipe_in(self):

async def _handle_pipe_msgs(self):
"""handle pipe messages from a subprocess"""
# create async wrapper within coroutine
self._async_pipe_in1 = zmq.asyncio.Socket(self._pipe_in1)
try:
while True:
await self._handle_pipe_msg()
except Exception:
if self.thread.stopped.is_set():
return
raise
async with self._pipe_in1:
try:
while True:
await self._handle_pipe_msg()
except Exception:
if self.thread.stopped.is_set():
return
raise

async def _handle_pipe_msg(self, msg=None):
"""handle a pipe message from a subprocess"""
msg = msg or await self._async_pipe_in1.recv_multipart()
msg = msg or await self._pipe_in1.arecv_multipart().wait()
if not self._pipe_flag or not self._is_main_process():
return
if msg[0] != self._pipe_uuid:
Expand Down Expand Up @@ -246,7 +241,10 @@ def close(self):
"""Close the IOPub thread."""
if self.closed:
return
self._pipe_in0.close()
try:
self._pipe_in0.close()
except Exception:
pass
if self._pipe_flag:
self._pipe_in1.close()
if self.socket is not None:
Expand Down
9 changes: 5 additions & 4 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from dataclasses import dataclass

import comm
import zmq.asyncio
import zmq_anyio
from anyio import TASK_STATUS_IGNORED, create_task_group, to_thread
from anyio.abc import TaskStatus
from IPython.core import release
Expand Down Expand Up @@ -93,7 +93,7 @@ class IPythonKernel(KernelBase):
help="Set this flag to False to deactivate the use of experimental IPython completion APIs.",
).tag(config=True)

debugpy_socket = Instance(zmq.asyncio.Socket, allow_none=True)
debugpy_socket = Instance(zmq_anyio.Socket, allow_none=True)

user_module = Any()

Expand Down Expand Up @@ -229,7 +229,8 @@ def __init__(self, **kwargs):
}

async def process_debugpy(self):
async with create_task_group() as tg:
assert self.debugpy_socket is not None
async with self.debug_shell_socket, self.debugpy_socket, create_task_group() as tg:
tg.start_soon(self.receive_debugpy_messages)
tg.start_soon(self.poll_stopped_queue)
await to_thread.run_sync(self.debugpy_stop.wait)
Expand All @@ -252,7 +253,7 @@ async def receive_debugpy_message(self, msg=None):

if msg is None:
assert self.debugpy_socket is not None
msg = await self.debugpy_socket.recv_multipart()
msg = await self.debugpy_socket.arecv_multipart().wait()
# The first frame is the socket id, we can drop it
frame = msg[1].decode("utf-8")
self.log.debug("Debugpy received: %s", frame)
Expand Down
Loading
Loading