Skip to content

Commit

Permalink
Add required service interface integration
Browse files Browse the repository at this point in the history
  • Loading branch information
abbiemery committed Feb 28, 2025
1 parent 49c97cf commit a1e7920
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
29 changes: 28 additions & 1 deletion src/blueapi/service/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from blueapi.worker.event import TaskStatusEnum, WorkerState
from blueapi.worker.task import Task
from blueapi.worker.task_worker import TaskWorker, TrackableTask
from blueapi.client.numtracker import NumtrackerClient

"""This module provides interface between web application and underlying Bluesky
context and worker"""
Expand Down Expand Up @@ -75,6 +76,25 @@ def stomp_client() -> StompClient | None:
else:
return None

@cache
def numtracker_client() -> NumtrackerClient | None:
conf = config()
cxt = context()
if conf.numtracker is not None:
client = NumtrackerClient(url=conf.numtracker.url, headers={})
cxt.run_engine.scan_id_source = _update_scan_num
return client
else:
return None


def _update_scan_num(md: dict[str, Any]) -> int:
numtracker = numtracker_client()
if numtracker is None:
raise RuntimeError("No idea how you got here?")
scan = numtracker.create_scan(md["data_session"], md["instrument"])
md["data_session_directory"] = str(scan.scan.visit.directory)
return scan.scan.scan_number

def setup(config: ApplicationConfig) -> None:
"""Creates and starts a worker with supplied config"""
Expand Down Expand Up @@ -144,12 +164,19 @@ def clear_task(task_id: str) -> str:
return worker().clear_task(task_id)


def begin_task(task: WorkerTask) -> WorkerTask:
def begin_task(task: WorkerTask, pass_through_headers: Mapping[str, str]) -> WorkerTask:
"""Trigger a task. Will fail if the worker is busy"""
_try_configure_numtracker(pass_through_headers)

if task.task_id is not None:
worker().begin_task(task.task_id)
return task

def _try_configure_numtracker(pass_through_headers: Mapping[str, str]) -> None:
numtracker = numtracker_client()
if numtracker is not None:
# TODO: Make a setter in NumtrackerClient
numtracker._headers = pass_through_headers

def get_tasks_by_status(status: TaskStatusEnum) -> list[TrackableTask]:
"""Retrieve a list of tasks based on their status."""
Expand Down
11 changes: 10 additions & 1 deletion src/blueapi/service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ def get_tasks(
)
@start_as_current_span(TRACER, "task.task_id")
def set_active_task(
request: Request,
task: WorkerTask,
runner: WorkerDispatcher = Depends(_runner),
) -> WorkerTask:
Expand All @@ -313,7 +314,15 @@ def set_active_task(
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Worker already active"
)
runner.run(interface.begin_task, task)
runner.run(
interface.begin_task,
task=task,
pass_through_headers={
key: request.headers[key]
for key in {"Authorization"}
if key in request.headers
},
)
return task


Expand Down

0 comments on commit a1e7920

Please sign in to comment.