Skip to content

Commit

Permalink
Try to remove background thread
Browse files Browse the repository at this point in the history
  • Loading branch information
marcelldls committed Dec 4, 2024
1 parent 7d602ce commit 3e127ed
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 130 deletions.
43 changes: 17 additions & 26 deletions src/fastcs/backend.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import asyncio
from collections import defaultdict
from collections.abc import Callable
from concurrent.futures import Future
from types import MethodType

from softioc.asyncio_dispatcher import AsyncioDispatcher

from .attributes import AttrR, AttrW, Sender, Updater
from .controller import Controller, SingleMapping
from .exceptions import FastCSException
Expand All @@ -15,49 +12,43 @@ class Backend:
def __init__(
self,
controller: Controller,
loop: asyncio.AbstractEventLoop | None = None,
loop: asyncio.AbstractEventLoop,
):
self.dispatcher = AsyncioDispatcher(loop)
self._loop = self.dispatcher.loop
self._loop = loop
self._controller = controller

self._initial_coros = [controller.connect]
self._scan_futures: set[Future] = set()

asyncio.run_coroutine_threadsafe(
self._controller.initialise(), self._loop
).result()
self._scan_tasks: set[asyncio.Task] = set()

loop.run_until_complete(self._controller.initialise())
self._link_process_tasks()
loop.run_until_complete(self._run_initial_tasks())

def _link_process_tasks(self):
for single_mapping in self._controller.get_controller_mappings():
_link_single_controller_put_tasks(single_mapping)
_link_attribute_sender_class(single_mapping)

def __del__(self):
self.stop_scan_futures()
self.stop_scan_tasks()

def run(self):
self._run_initial_futures()
self.start_scan_futures()
async def run(self):
await self.start_scan_tasks()

def _run_initial_futures(self):
async def _run_initial_tasks(self):
for coro in self._initial_coros:
future = asyncio.run_coroutine_threadsafe(coro(), self._loop)
future.result()
await coro()

def start_scan_futures(self):
self._scan_futures = {
asyncio.run_coroutine_threadsafe(coro(), self._loop)
for coro in _get_scan_coros(self._controller)
async def start_scan_tasks(self):
self._scan_tasks = {
self._loop.create_task(coro()) for coro in _get_scan_coros(self._controller)
}

def stop_scan_futures(self):
for future in self._scan_futures:
if not future.done():
def stop_scan_tasks(self):
for task in self._scan_tasks:
if not task.done():
try:
future.cancel()
task.cancel()
except asyncio.CancelledError:
pass

Expand Down
84 changes: 49 additions & 35 deletions src/fastcs/launch.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import inspect
import json
from pathlib import Path
Expand All @@ -16,7 +17,7 @@
from .transport.tango.options import TangoOptions

# Define a type alias for transport options
TransportOptions: TypeAlias = EpicsOptions | TangoOptions | RestOptions
TransportOptions: TypeAlias = list[EpicsOptions | TangoOptions | RestOptions]


class FastCS:
Expand All @@ -25,41 +26,56 @@ def __init__(
controller: Controller,
transport_options: TransportOptions,
):
self._backend = Backend(controller)
self._transport: TransportAdapter
match transport_options:
case EpicsOptions():
from .transport.epics.adapter import EpicsTransport

self._transport = EpicsTransport(
controller,
self._backend.dispatcher,
transport_options,
)
case TangoOptions():
from .transport.tango.adapter import TangoTransport

self._transport = TangoTransport(
controller,
transport_options,
)
case RestOptions():
from .transport.rest.adapter import RestTransport

self._transport = RestTransport(
controller,
transport_options,
)
self._loop = asyncio.get_event_loop()
self._backend = Backend(controller, self._loop)
transport: TransportAdapter
self._transports: list[TransportAdapter] = []
for option in transport_options:
match option:
case EpicsOptions():
from .transport.epics.adapter import EpicsTransport

transport = EpicsTransport(
controller,
self._loop,
option,
)
case TangoOptions():
from .transport.tango.adapter import TangoTransport

transport = TangoTransport(
controller,
self._loop,
option,
)
case RestOptions():
from .transport.rest.adapter import RestTransport

transport = RestTransport(
controller,
option,
)
self._transports.append(transport)

def create_docs(self) -> None:
self._transport.create_docs()
for transport in self._transports:
if hasattr(transport.options, "docs"):
transport.create_docs()

def create_gui(self) -> None:
self._transport.create_gui()
for transport in self._transports:
if hasattr(transport.options, "gui"):
transport.create_docs()

def run(self) -> None:
self._backend.run()
self._transport.run()
def run(self):
self._loop.run_until_complete(
self.serve(),
)

async def serve(self) -> None:
coros = [self._backend.run()]
coros.extend([transport.run() for transport in self._transports])
await asyncio.gather(*coros)


def launch(controller_class: type[Controller]) -> None:
Expand Down Expand Up @@ -140,10 +156,8 @@ def run(
instance_options.transport,
)

if "gui" in options_yaml["transport"]:
instance.create_gui()
if "docs" in options_yaml["transport"]:
instance.create_docs()
instance.create_gui()
instance.create_docs()
instance.run()

return launch_typer
Expand Down
8 changes: 7 additions & 1 deletion src/fastcs/transport/adapter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from abc import ABC, abstractmethod
from typing import Any


class TransportAdapter(ABC):
@property
@abstractmethod
def run(self) -> None:
def options(self) -> Any:
pass

@abstractmethod
async def run(self) -> None:
pass

@abstractmethod
Expand Down
18 changes: 12 additions & 6 deletions src/fastcs/transport/epics/adapter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from softioc.asyncio_dispatcher import AsyncioDispatcher
import asyncio

from fastcs.controller import Controller
from fastcs.transport.adapter import TransportAdapter
Expand All @@ -13,20 +13,26 @@ class EpicsTransport(TransportAdapter):
def __init__(
self,
controller: Controller,
dispatcher: AsyncioDispatcher,
loop: asyncio.AbstractEventLoop,
options: EpicsOptions | None = None,
) -> None:
self.options = options or EpicsOptions()
self._controller = controller
self._dispatcher = dispatcher
self._loop = loop
self._options = options or EpicsOptions()
self._pv_prefix = self.options.ioc.pv_prefix
self._ioc = EpicsIOC(self.options.ioc.pv_prefix, controller)

@property
def options(self) -> EpicsOptions:
return self._options

def create_docs(self) -> None:
EpicsDocs(self._controller).create_docs(self.options.docs)

def create_gui(self) -> None:
EpicsGUI(self._controller, self._pv_prefix).create_gui(self.options.gui)

def run(self):
self._ioc.run(self._dispatcher)
async def run(self):
self._ioc.run(self._loop)
while True: # Keep running without terminal:
await asyncio.sleep(0.1)
16 changes: 9 additions & 7 deletions src/fastcs/transport/epics/ioc.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from collections.abc import Callable
from dataclasses import asdict
from types import MethodType
Expand Down Expand Up @@ -60,17 +61,18 @@ def __init__(

def run(
self,
dispatcher: AsyncioDispatcher,
loop: asyncio.AbstractEventLoop,
) -> None:
dispatcher = AsyncioDispatcher(loop) # Needs running loop
builder.LoadDatabase()
softioc.iocInit(dispatcher)

if self.options.terminal:
context = {
"dispatcher": dispatcher,
"controller": self._controller,
}
softioc.interactive_ioc(context)
# if False:
# context = {
# "dispatcher": dispatcher,
# "controller": self._controller,
# }
# softioc.interactive_ioc(context)


def _add_pvi_info(
Expand Down
12 changes: 9 additions & 3 deletions src/fastcs/transport/rest/adapter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from collections.abc import Coroutine

from fastcs.controller import Controller
from fastcs.transport.adapter import TransportAdapter

Expand All @@ -11,14 +13,18 @@ def __init__(
controller: Controller,
options: RestOptions | None = None,
):
self.options = options or RestOptions()
self._options = options or RestOptions()
self._server = RestServer(controller)

@property
def options(self) -> RestOptions:
return self._options

def create_docs(self) -> None:
raise NotImplementedError

def create_gui(self) -> None:
raise NotImplementedError

def run(self) -> None:
self._server.run(self.options.rest)
def run(self) -> Coroutine:
return self._server.run(self.options.rest)
21 changes: 15 additions & 6 deletions src/fastcs/transport/rest/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,23 @@ def _create_app(self):

return app

def run(self, options: RestServerOptions | None) -> None:
async def run(self, options: RestServerOptions | None):
options = options or RestServerOptions()
uvicorn.run(
self._app,
host=options.host,
port=options.port,
log_level=options.log_level,
self._serv = uvicorn.Server(
uvicorn.Config(
app=self._app,
host=options.host,
port=options.port,
log_level=options.log_level,
)
)
await self._serv.serve()
# uvicorn.run(
# self._app,
# host=options.host,
# port=options.port,
# log_level=options.log_level,
# )


def _put_request_body(attribute: AttrW[T]):
Expand Down
19 changes: 15 additions & 4 deletions src/fastcs/transport/tango/adapter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import asyncio

from fastcs.controller import Controller
from fastcs.transport.adapter import TransportAdapter

Expand All @@ -9,16 +11,25 @@ class TangoTransport(TransportAdapter):
def __init__(
self,
controller: Controller,
loop: asyncio.AbstractEventLoop | None = None,
options: TangoOptions | None = None,
):
self.options = options or TangoOptions()
self._dsr = TangoDSR(controller)
self._options = options or TangoOptions()
self._dsr = TangoDSR(controller, loop)

@property
def options(self) -> TangoOptions:
return self._options

def create_docs(self) -> None:
raise NotImplementedError

def create_gui(self) -> None:
raise NotImplementedError

def run(self) -> None:
self._dsr.run(self.options.dsr)
async def run(self) -> None:
coro = asyncio.to_thread(
self._dsr.run,
self.options.dsr,
)
await coro
Loading

0 comments on commit 3e127ed

Please sign in to comment.