Skip to content

Commit

Permalink
WIP numtracker integration
Browse files Browse the repository at this point in the history
  • Loading branch information
callumforrester committed Feb 18, 2025
1 parent 134a554 commit ae63e23
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 15 deletions.
18 changes: 18 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
stomp:
host: "localhost"
port: 61613
auth:
username: "guest"
password: "guest"
# New config
numtracker:
url: https://numtracker.diamond.ac.uk/graphql
env:
# New config
metadata:
data_session: cm37271-2
instrument: i22
oidc:
well_known_url: "https://authn.diamond.ac.uk/realms/master/.well-known/openid-configuration"
client_id: "blueapi-cli"
13 changes: 13 additions & 0 deletions config_local.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
stomp:
host: "localhost"
port: 61613
auth:
username: "guest"
password: "guest"
numtracker:
url: http://localhost:8002/graphql
env:
metadata:
data_session: cm123456
instrument: i22
14 changes: 14 additions & 0 deletions src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ class WorkerEventConfig(BlueapiBaseModel):
broadcast_status_events: bool = True


class MetadataConfig(BlueapiBaseModel):
# TODO: Reconcile data_session (bluesky term) with instrument_session (DLS term)
data_session: str = "aa123456"

# TODO: Does/should this conflict with the ${BEAMLINE} environment variable
instrument: str = "p01"


class EnvironmentConfig(BlueapiBaseModel):
"""
Config for the RunEngine environment
Expand All @@ -63,6 +71,7 @@ class EnvironmentConfig(BlueapiBaseModel):
Source(kind=SourceKind.PLAN_FUNCTIONS, module="dodal.plan_stubs.wrapped"),
]
events: WorkerEventConfig = Field(default_factory=WorkerEventConfig)
metadata: MetadataConfig = Field(default_factory=MetadataConfig)


class LoggingConfig(BlueapiBaseModel):
Expand Down Expand Up @@ -153,6 +162,10 @@ def id_token_signing_alg_values_supported(self) -> list[str]:
)


class NumtrackerConfig(BlueapiBaseModel):
url: str = "http://localhost:8000/graphql"


class ApplicationConfig(BlueapiBaseModel):
"""
Config for the worker application as a whole. Root of
Expand All @@ -166,6 +179,7 @@ class ApplicationConfig(BlueapiBaseModel):
scratch: ScratchConfig | None = None
oidc: OIDCConfig | None = None
auth_token_path: Path | None = None
numtracker: NumtrackerConfig | None = None

def __eq__(self, other: object) -> bool:
if isinstance(other, ApplicationConfig):
Expand Down
2 changes: 2 additions & 0 deletions src/blueapi/core/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ def find_device(self, addr: str | list[str]) -> Device | None:
return find_component(self.devices, addr)

def with_config(self, config: EnvironmentConfig) -> None:
self.run_engine.md |= config.metadata.model_dump()

for source in config.sources:
mod = import_module(str(source.module))

Expand Down
58 changes: 58 additions & 0 deletions src/blueapi/numtracker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
from collections.abc import Mapping
from pathlib import Path

import requests
from pydantic import Field

from blueapi.utils import BlueapiBaseModel


class NumtrackerNewScanVisit(BlueapiBaseModel):
beamline: str
directory: Path


class NumtrackerNewScanScan(BlueapiBaseModel):
scan_file: str = Field(alias="scanFile")
scan_number: int = Field(alias="scanNumber")
visit: NumtrackerNewScanVisit


class NumtrackerNewScan(BlueapiBaseModel):
scan: NumtrackerNewScanScan


class NumtrackerClient:
def __init__(
self,
url: str,
headers: Mapping[str, str],
) -> None:
self._url = url
self._headers = headers

# TODO: Could make this async, but since it's called from RE.scan_id_source, we would need
# to change the RE to accept an async function in the scan_id_source hook. It's a 1-line
# change but would need to be reviewed etc.
def create_scan(self, visit: str, beamline: str) -> NumtrackerNewScan:
query = {
"query": f'mutation{{scan(beamline: "{beamline}", visit: "{visit}") {{scanFile scanNumber visit{{beamline directory}}}}}}'
}

response = requests.post(
self._url,
headers=self._headers,
json=query,
)

response.raise_for_status()
json = response.json()

if json["data"] is not None:
new_collection = NumtrackerNewScan.model_validate(json["data"])
logging.debug("New NumtrackerNewScan: %s", new_collection)
return new_collection
else:
error_message = json.get("errors", "unknown server error")
raise RuntimeError(error_message)
47 changes: 46 additions & 1 deletion src/blueapi/service/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@

from bluesky_stomp.messaging import StompClient
from bluesky_stomp.models import Broker, DestinationBase, MessageTopic
from dodal.common.beamlines.beamline_utils import (
StartDocumentBasedPathProvider,
get_path_provider,
)

from blueapi.config import ApplicationConfig, OIDCConfig, StompConfig
from blueapi.core.context import BlueskyContext
from blueapi.core.event import EventStream
from blueapi.numtracker import NumtrackerClient
from blueapi.service.model import DeviceModel, PlanModel, WorkerTask
from blueapi.worker.event import TaskStatusEnum, WorkerState
from blueapi.worker.task import Task
Expand Down Expand Up @@ -76,6 +81,27 @@ def stomp_client() -> StompClient | None:
return None


@cache
def numtracker_client() -> NumtrackerClient | None:
conf = config()
cxt = context()
if conf.numtracker is not None:
client = NumtrackerClient(url=conf.numtracker.url, headers={})
cxt.run_engine.scan_id_source = _update_scan_num
return client
else:
return None


def _update_scan_num(md: dict[str, Any]) -> int:
numtracker = numtracker_client()
if numtracker is None:
raise RuntimeError("No idea how you got here?")
scan = numtracker.create_scan(md["data_session"], md["instrument"])
md["data_session_directory"] = str(scan.scan.visit.directory)
return scan.scan.scan_number


def setup(config: ApplicationConfig) -> None:
"""Creates and starts a worker with supplied config"""

Expand All @@ -86,6 +112,16 @@ def setup(config: ApplicationConfig) -> None:
logging.basicConfig(format="%(asctime)s - %(message)s", level=config.logging.level)
worker()
stomp_client()
_hook_run_engine_and_path_provider()


# TODO: Make the path provider ourselves and inject it into dodal, leaving BL modules to define their own offline default
def _hook_run_engine_and_path_provider() -> None:
path_provider = get_path_provider()
run_engine = context().run_engine

if isinstance(path_provider, StartDocumentBasedPathProvider):
run_engine.subscribe(path_provider.update_run, "start")


def teardown() -> None:
Expand Down Expand Up @@ -144,13 +180,22 @@ def clear_task(task_id: str) -> str:
return worker().clear_task(task_id)


def begin_task(task: WorkerTask) -> WorkerTask:
def begin_task(task: WorkerTask, pass_through_headers: Mapping[str, str]) -> WorkerTask:
"""Trigger a task. Will fail if the worker is busy"""
_try_configure_numtracker(pass_through_headers)

if task.task_id is not None:
worker().begin_task(task.task_id)
return task


def _try_configure_numtracker(pass_through_headers: Mapping[str, str]) -> None:
numtracker = numtracker_client()
if numtracker is not None:
# TODO: Make a setter in NumtrackerClient
numtracker._headers = pass_through_headers


def get_tasks_by_status(status: TaskStatusEnum) -> list[TrackableTask]:
"""Retrieve a list of tasks based on their status."""
return worker().get_tasks_by_status(status)
Expand Down
11 changes: 10 additions & 1 deletion src/blueapi/service/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ def get_tasks(
)
@start_as_current_span(TRACER, "task.task_id")
def set_active_task(
request: Request,
task: WorkerTask,
runner: WorkerDispatcher = Depends(_runner),
) -> WorkerTask:
Expand All @@ -313,7 +314,15 @@ def set_active_task(
raise HTTPException(
status_code=status.HTTP_409_CONFLICT, detail="Worker already active"
)
runner.run(interface.begin_task, task)
runner.run(
interface.begin_task,
task=task,
pass_through_headers={
key: request.headers[key]
for key in {"Authorization"}
if key in request.headers
},
)
return task


Expand Down
16 changes: 5 additions & 11 deletions src/blueapi/startup/example_devices.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from pathlib import Path

from dodal.common.beamlines.beamline_utils import set_path_provider
from dodal.common.visit import LocalDirectoryServiceClient, StaticVisitPathProvider
from dodal.common.beamlines.beamline_utils import (
StartDocumentBasedPathProvider,
set_path_provider,
)
from ophyd.sim import Syn2DGauss, SynGauss, SynSignal

from .simmotor import BrokenSynAxis, SynAxisWithMotionEvents
Expand All @@ -10,14 +11,7 @@
# singleton to be set

# Workaround for https://github.com/DiamondLightSource/blueapi/issues/784
_tmp_dir = Path("/does/not/exist")
set_path_provider(
StaticVisitPathProvider(
"t01",
_tmp_dir,
client=LocalDirectoryServiceClient(),
)
)
set_path_provider(StartDocumentBasedPathProvider())


def x(name="x") -> SynAxisWithMotionEvents:
Expand Down
21 changes: 19 additions & 2 deletions src/blueapi/startup/example_plans.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,31 @@
import bluesky.plan_stubs as bps
import bluesky.preprocessors as bpp
from bluesky.protocols import Movable, Readable
from bluesky.utils import MsgGenerator
from dodal.common import inject
from dodal.common.beamlines.beamline_utils import get_path_provider
from dodal.plan_stubs.wrapped import move
from dodal.plans import count

from blueapi.core import MsgGenerator

TEMP: Movable = inject("sample_temperature")
PRESS: Movable = inject("sample_pressure")


def file_writing() -> MsgGenerator[None]:
detectors = ["d1", "d2", "d3"]
provider = get_path_provider()

@bpp.run_decorator()
def inner() -> MsgGenerator[None]:
yield from bps.sleep(0.1)
for detector in detectors:
path_info = provider(detector)
print(f"{detector} -> {path_info}")
yield from bps.sleep(0.1)

yield from inner()


def stp_snapshot(
detectors: list[Readable],
temperature: Movable = TEMP,
Expand Down

0 comments on commit ae63e23

Please sign in to comment.