From a1e7920ed24be1cccadf6f214e3ad59c382d7949 Mon Sep 17 00:00:00 2001 From: Abigail Emery Date: Fri, 28 Feb 2025 17:34:17 +0000 Subject: [PATCH] Add required service interface integration --- src/blueapi/service/interface.py | 29 ++++++++++++++++++++++++++++- src/blueapi/service/main.py | 11 ++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/blueapi/service/interface.py b/src/blueapi/service/interface.py index 803841964..64e06add2 100644 --- a/src/blueapi/service/interface.py +++ b/src/blueapi/service/interface.py @@ -13,6 +13,7 @@ from blueapi.worker.event import TaskStatusEnum, WorkerState from blueapi.worker.task import Task from blueapi.worker.task_worker import TaskWorker, TrackableTask +from blueapi.client.numtracker import NumtrackerClient """This module provides interface between web application and underlying Bluesky context and worker""" @@ -75,6 +76,25 @@ def stomp_client() -> StompClient | None: else: return None +@cache +def numtracker_client() -> NumtrackerClient | None: + conf = config() + cxt = context() + if conf.numtracker is not None: + client = NumtrackerClient(url=conf.numtracker.url, headers={}) + cxt.run_engine.scan_id_source = _update_scan_num + return client + else: + return None + + +def _update_scan_num(md: dict[str, Any]) -> int: + numtracker = numtracker_client() + if numtracker is None: + raise RuntimeError("No idea how you got here?") + scan = numtracker.create_scan(md["data_session"], md["instrument"]) + md["data_session_directory"] = str(scan.scan.visit.directory) + return scan.scan.scan_number def setup(config: ApplicationConfig) -> None: """Creates and starts a worker with supplied config""" @@ -144,12 +164,19 @@ def clear_task(task_id: str) -> str: return worker().clear_task(task_id) -def begin_task(task: WorkerTask) -> WorkerTask: +def begin_task(task: WorkerTask, pass_through_headers: Mapping[str, str]) -> WorkerTask: """Trigger a task. Will fail if the worker is busy""" + _try_configure_numtracker(pass_through_headers) + if task.task_id is not None: worker().begin_task(task.task_id) return task +def _try_configure_numtracker(pass_through_headers: Mapping[str, str]) -> None: + numtracker = numtracker_client() + if numtracker is not None: + # TODO: Make a setter in NumtrackerClient + numtracker._headers = pass_through_headers def get_tasks_by_status(status: TaskStatusEnum) -> list[TrackableTask]: """Retrieve a list of tasks based on their status.""" diff --git a/src/blueapi/service/main.py b/src/blueapi/service/main.py index 3e779c518..6fe55e358 100644 --- a/src/blueapi/service/main.py +++ b/src/blueapi/service/main.py @@ -303,6 +303,7 @@ def get_tasks( ) @start_as_current_span(TRACER, "task.task_id") def set_active_task( + request: Request, task: WorkerTask, runner: WorkerDispatcher = Depends(_runner), ) -> WorkerTask: @@ -313,7 +314,15 @@ def set_active_task( raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Worker already active" ) - runner.run(interface.begin_task, task) + runner.run( + interface.begin_task, + task=task, + pass_through_headers={ + key: request.headers[key] + for key in {"Authorization"} + if key in request.headers + }, + ) return task