Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use queue_task instead of direct task creation #16

Merged
merged 4 commits into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.1.3] - 2025-01-26

### Changed
- Replace direct asyncio.create_task with queue_task method in examples
- Clarify worker thread function naming
- Add proper worker state checks in test teardown

## [1.1.2] - 2025-01-24

### Changed
- Improved worker pattern task cancellation
- Simplify task result handling by moving it to try block
- Remove redundant cancelled flag

### Changed

[1.1.1] - 2025-01-24
## [1.1.1] - 2025-01-24

### Changed
- Improved worker pattern implementation:
Expand Down
10 changes: 1 addition & 9 deletions examples/stock_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def on_started(self):

logger.info("[StockService][on_started] started")
self._running = True
self._update_task = asyncio.create_task(self.update_prices())
self.queue_task(self.update_prices())

async def on_stopped(self):
"""
Expand All @@ -149,14 +149,6 @@ async def on_stopped(self):
logger.info("[StockService][on_stopped] stopped")
self._running = False

if hasattr(self, "_update_task"):
self._update_task.cancel()

try:
await self._update_task
except asyncio.CancelledError:
pass

async def update_prices(self):
"""
Periodically update stock prices in a loop.
Expand Down
36 changes: 17 additions & 19 deletions examples/stock_monitor_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,33 +63,33 @@ def data_processed(self):
async def on_started(self, *args, **kwargs):
"""Called when worker starts."""

logger.info("[DataWorker][on_started] Starting")
print(f"[DataWorker][on_started] thread: {threading.current_thread().name}")
self._running = True
self._update_task = asyncio.create_task(self.update_loop())
self._update_task = self.queue_task(self.process_data())

@listener
async def on_stopped(self):
"""Called when worker stops."""

logger.info("[DataWorker][on_stopped] Stopping")
self._running = False
if self._update_task:
self._update_task.cancel()
try:
await self._update_task
except asyncio.CancelledError:
pass

async def update_loop(self):
"""Periodically emits a counter value."""
print(f"[DataWorker][on_stopped] thread: {threading.current_thread().name}")

async def process_data(self):
count = 0

while self._running:
logger.info("[DataWorker] Processing data %d", count)
print(
f"[DataWorker][process_data] data {count} thread: {threading.current_thread().name}"
)

self.data_processed.emit(count)
count += 1
await asyncio.sleep(1)

print(
f"[DataWorker][process_data] END thread: {threading.current_thread().name}"
)


@with_emitters
class DataDisplay:
Expand All @@ -116,14 +116,12 @@ def on_data_processed(self, value):
Logs the received value and simulates a brief processing delay.
"""

current_thread = threading.current_thread()
logger.info(
"[DataDisplay] Received value %d in thread: %s", value, current_thread.name
print(
f"[DataDisplay][on_data_processed] START in thread: {threading.current_thread().name}"
)
self.last_value = value
# Add a small delay to check the result
time.sleep(0.1)
logger.info("[DataDisplay] Processed value %d", value)
time.sleep(0.1) # simulate heavy processing
print(f"[DataDisplay][on_data_processed] END value {value}")


async def main():
Expand Down
79 changes: 79 additions & 0 deletions examples/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import asyncio
import threading
import time
from pynnex import with_emitters, emitter, listener, with_worker


@with_worker
class DataWorker:
def __init__(self):
self._running = False
self._update_task = None
self.started.connect(self.on_started)
self.stopped.connect(self.on_stopped)

@emitter
def data_processed(self):
pass

@listener
async def on_started(self, *args, **kwargs):
print(f"[DataWorker][on_started] thread: {threading.current_thread().name}")
self._running = True
self._update_task = self.queue_task(self.process_data())

@listener
async def on_stopped(self):
self._running = False
print(f"[DataWorker][on_stopped] thread: {threading.current_thread().name}")

# try:
# await self._update_task
# except asyncio.CancelledError:
# pass

async def process_data(self):
count = 0
while self._running:
print(
f"[DataWorker][process_data] data {count} thread: {threading.current_thread().name}"
)
self.data_processed.emit(count)
count += 1
await asyncio.sleep(1)
print(
f"[DataWorker][process_data] END thread: {threading.current_thread().name}"
)


@with_emitters
class DataDisplay:
def __init__(self):
self.last_value = None

@listener
def on_data_processed(self, value):
current_thread = threading.current_thread()
print(
f"[DataDisplay][on_data_processed] START in thread: {current_thread.name}"
)
self.last_value = value
time.sleep(0.1) # simulate heavy processing
print(f"[DataDisplay][on_data_processed] END value {value}")


async def main():
worker = DataWorker()
display = DataDisplay()

worker.data_processed.connect(display, display.on_data_processed)
worker.start()

try:
await asyncio.sleep(3)
finally:
worker.stop()


if __name__ == "__main__":
asyncio.run(main())
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "PynneX"
version = "1.1.2"
version = "1.1.3"
description = "A Python Emitter-Listener library"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
10 changes: 6 additions & 4 deletions src/pynnex/contrib/patterns/worker/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def start(self, *args, **kwargs):
- Emits 'started' emitter when initialized
"""

def _thread_main():
def _worker_thread_main():
loop = asyncio.new_event_loop()
self._nx_loop = loop
asyncio.set_event_loop(loop)
Expand Down Expand Up @@ -221,7 +221,9 @@ async def _flush_preloop_buffer():
)
self.state = WorkerState.STARTING
self._nx_main_loop_coro = self._default_main_loop()
self._nx_thread = threading.Thread(target=_thread_main, daemon=True)
self._nx_thread = threading.Thread(
target=_worker_thread_main, daemon=True
)
self._nx_thread.start()
except Exception as e:
logger_worker.error("Worker start failed: %s", str(e))
Expand All @@ -233,6 +235,7 @@ async def _default_main_loop(self):
Process the task queue: sequentially processes coroutines from self._task_queue
Exits when state is STOPPING/STOPPED
"""

try:
while self.state not in (WorkerState.STOPPING, WorkerState.STOPPED):
task_wrapper = await self._nx_task_queue.get()
Expand All @@ -242,7 +245,6 @@ async def _default_main_loop(self):
break

result = None
errored = False

try:
result = await task_wrapper.coro
Expand All @@ -267,7 +269,6 @@ def cancel_future(tw=task_wrapper):
raise

except Exception as e:
errored = True
logger_worker.exception(
"Error while awaiting the task_wrapper.coro (type=%s): %s",
type(task_wrapper.coro),
Expand All @@ -291,6 +292,7 @@ def cancel_future(tw=task_wrapper):
# Cancel any remaining task_wrapper in the queue after the loop finishes
while not self._nx_task_queue.empty():
tw = self._nx_task_queue.get_nowait()

if tw is not None and tw.future and not tw.future.done():
tw.loop.call_soon_threadsafe(
lambda: tw.future.set_exception(asyncio.CancelledError())
Expand Down
Loading
Loading