Skip to content

Commit

Permalink
refactor: improve worker pattern implementation (#13)
Browse files Browse the repository at this point in the history
- Refactor worker pattern to use state machine (CREATED, STARTING, STARTED, STOPPING, STOPPED)
- Add thread-safe task queue with pre-loop buffering
- Improve worker lifecycle management with proper cleanup
- Make worker state transitions more explicit and safer
- Replace run() method with started signal listener pattern
- Fix thread cleanup in tests
- Add detailed debug logging for worker operations
- Update examples

Co-authored-by: San <san.tekart@gmail.com>
  • Loading branch information
nexconnectio and san-tekart authored Jan 25, 2025
1 parent 68adcbb commit 334d42a
Show file tree
Hide file tree
Showing 23 changed files with 1,039 additions and 649 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.1.1] - 2025-01-24

### Changed
- Improved worker pattern implementation:
- Added state machine (CREATED, STARTING, STARTED, STOPPING, STOPPED)
- Added thread-safe task queue with pre-loop buffering
- Replaced run() method with started signal listener pattern
- Enhanced worker lifecycle management and cleanup
- Added detailed debug logging for worker operations
- Updated documentation and examples:
- Added complete runnable worker examples
- Updated API documentation to reflect new worker pattern
- Improved README examples with async context

## [1.1.0] - 2025-01-16

### Changed
Expand Down
196 changes: 134 additions & 62 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pip install -e .

For development (includes tests and linting tools):
```
pip install -e ".[dev]
pip install -e ".[dev]"
```

## Quick Hello (Emitters/Listeners)
Expand All @@ -71,32 +71,41 @@ Here’s the simplest “Hello, Emitters/Listeners” example. Once installed, r

```python
# hello_pynnex.py
import asyncio
from pynnex import with_emitters, emitter, listener


@with_emitters
class Greeter:
@emitter
def greet(self):
"""Emitter emitted when greeting happens."""
pass

def say_hello(self):
self.greet.emit("Hello from PynneX!")


@with_emitters
class Printer:
@listener
def on_greet(self, message):
print(message)

greeter = Greeter()
printer = Printer()

# Connect the emitter to the listener
greeter.greet.connect(printer, printer.on_greet)
async def main():
# The following code needs to be inside async main() as it requires a running event loop
greeter = Greeter()
printer = Printer()

# Connect the emitter to the listener
greeter.greet.connect(printer, printer.on_greet)

# Fire the emitter
greeter.say_hello()
# Fire the emitter
greeter.say_hello()


if __name__ == "__main__":
asyncio.run(main())
```

**Output:**
Expand All @@ -109,45 +118,81 @@ By simply defining `emitter` and `listener`, you can set up intuitive event hand
If you come from a Qt background or prefer “signal-slot” naming, use:

```python
import asyncio
from pynnex import with_signals, signal, slot


@with_signals
class Greeter:
@signal
def greet(self):
"""Signal that fires a greeting event."""
pass
"""Emitter emitted when greeting happens."""

def say_hello(self):
self.greet.emit("Hello from PynneX!")


@with_signals
class Printer:
@slot
def on_greet(self, message):
print(f"Received: {message}")
print(message)


async def main():
# The following code needs to be inside async main() as it requires a running event loop
greeter = Greeter()
printer = Printer()

# Connect the emitter to the listener
greeter.greet.connect(printer, printer.on_greet)

# Fire the emitter
greeter.say_hello()


if __name__ == "__main__":
asyncio.run(main())
```

If you prefer a Pub/Sub style, use:

```python
from pynnex import with_signals, signal, slot
import asyncio
from pynnex import with_publishers, publisher, subscriber


@with_publishers
class Greeter:
@publisher
def greet(self):
"""Publisher that fires a greeting event."""
pass
"""Emitter emitted when greeting happens."""

def say_hello(self):
self.greet.emit("Hello from PynneX!")
self.greet.publish("Hello from PynneX!")


@with_subscribers
@with_publishers
class Printer:
@subscriber
def on_greet(self, message):
print(f"Received: {message}")
print(message)


async def main():
# The following code needs to be inside async main() as it requires a running event loop
greeter = Greeter()
printer = Printer()

# Connect the emitter to the listener
greeter.greet.connect(printer, printer.on_greet)

# Fire the emitter
greeter.say_hello()


if __name__ == "__main__":
asyncio.run(main())
```

They’re all interchangeable aliases pointing to the same core functionality.
Expand All @@ -158,57 +203,47 @@ They’re all interchangeable aliases pointing to the same core functionality.

Below are some brief examples. For more, see the [docs/](https://github.com/nexconnectio/pynnex/blob/main/docs/) directory.

### Basic Counter & Display
### Asynchronous Listener Example
```python
import asyncio
from pynnex import with_emitters, emitter, listener


@with_emitters
class Counter:
def __init__(self):
self.count = 0

@emitter
def count_changed(self):
pass

def increment(self):
self.count += 1
self.count_changed.emit(self.count)


@with_emitters
class Display:
@listener
async def on_count_changed(self, value):
print(f"Count is now: {value}")

# Connect and use
counter = Counter()
display = Display()
counter.count_changed.connect(display, display.on_count_changed)
counter.increment() # Will print: "Count is now: 1"
```

### Asynchronous Listener Example
```python
@with_emitters
class AsyncDisplay:
@listener
async def on_count_changed(self, value):
await asyncio.sleep(1) # Simulate async operation
print(f"Count updated to: {value}")

# Usage in async context
async def main():
# Connect and use
counter = Counter()
display = AsyncDisplay()

display = Display()
counter.count_changed.connect(display, display.on_count_changed)
counter.increment()

# Wait for async processing
await asyncio.sleep(1.1)
counter.increment() # Will print: "Count is now: 1"

# Wait a bit to allow async listener to execute
await asyncio.sleep(0.1)


if __name__ == "__main__":
asyncio.run(main())

asyncio.run(main())
```

## Core Concepts
Expand All @@ -231,28 +266,39 @@ This mechanism frees you from manually dispatching calls across threads.
The `@nx_property` decorator provides thread-safe property access with automatic emitter emission:

```python
import asyncio
from pynnex import with_emitters, emitter, nx_property


@with_emitters
class Example:
def __init__(self):
super().__init__()
self._data = None

@emitter
def updated(self):
"""Emitter emitted when data changes."""
pass


@nx_property(notify=updated)
def data(self):
"""Thread-safe property with change notification."""
return self._data

@data.setter
def data(self, value):
self._data = value
print(f"Data set to: {value}")


async def main():
example = Example()
example.data = 42 # Thread-safe property set; emits 'updated' emitter on change
await asyncio.sleep(0.1)

e = Example()
e.data = 42 # Thread-safe property set; emits 'updated' emitter on change

if __name__ == "__main__":
asyncio.run(main())
```

### Worker Threads
Expand All @@ -265,32 +311,58 @@ For background work, PynneX provides a `@nx_with_worker` decorator that:

**Worker Example**
```python
from pynnex import nx_with_worker, emitter
import asyncio
from pynnex import with_worker, emitter, listener


@with_worker
class DataProcessor:
def __init__(self):
self.started.connect(self.on_started)
self.processing_done.connect(self.on_processing_done)
self.result = None

@emitter
def processing_done(self):
"""Emitted when processing completes"""

async def run(self, *args, **kwargs):
# The main entry point for the worker thread’s event loop
# Wait for tasks or stopping emitter
await self.wait_for_stop()
@listener
async def on_started(self, *args, **kwargs):
"""Called when worker starts"""
print("Worker started, processing data...")
await self.process_data(42)

@listener
def on_processing_done(self, result):
"""Called when processing completes"""
self.result = result
print(f"Processing complete! Result: {result}")

async def process_data(self, data):
# Perform heavy computation in the worker thread
result = await heavy_computation(data)
"""Perform heavy computation in the worker thread"""
await asyncio.sleep(2) # Simulate heavy computation
result = data * 2
self.processing_done.emit(result)

processor = DataProcessor()
processor.start()

# Queue a task to run in the worker thread:
processor.queue_task(processor.process_data(some_data))
async def main():
# Create and start the processor
processor = DataProcessor()
processor.start()

# Wait for processing to complete
await asyncio.sleep(3)

# Stop the worker gracefully
processor.stop()

# Verify the result
assert processor.result == 84, f"Expected 84, got {processor.result}"
print("Worker example completed successfully!")


# Stop the worker gracefully
processor.stop()
if __name__ == "__main__":
asyncio.run(main())
```

## Documentation and Example
Expand Down
Loading

0 comments on commit 334d42a

Please sign in to comment.