Skip to content

Commit

Permalink
Refactor RecordFactory to allow per-universe implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
timj committed Jan 16, 2025
1 parent 36545c9 commit f0de6ed
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 83 deletions.
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/registry/obscore/_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def __init__(
dimensions,
SqlQueryContext(self.db, column_type_info),
)
self.record_factory = RecordFactory(
self.record_factory = RecordFactory.get_record_type_from_universe(universe)(
config, schema, universe, spatial_plugins, exposure_region_factory
)
self.tagged_collection: str | None = None
Expand Down
293 changes: 211 additions & 82 deletions python/lsst/daf/butler/registry/obscore/_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import warnings
from abc import abstractmethod
from collections.abc import Callable, Collection, Mapping
from importlib.metadata import entry_points
from typing import TYPE_CHECKING, Any, cast
from uuid import UUID

Expand All @@ -46,6 +47,7 @@
if TYPE_CHECKING:
from lsst.sphgeom import Region

from ._config import DatasetTypeConfig
from ._schema import ObsCoreSchema
from ._spatial import SpatialObsCorePlugin

Expand Down Expand Up @@ -120,15 +122,96 @@ def __init__(
self.visit = universe["visit"]
self.physical_filter = cast(Dimension, universe["physical_filter"])

def make_generic_records(self, ref: DatasetRef, dataset_config: DatasetTypeConfig) -> Record:
"""Fill record content that is not associated with a specific universe.
Parameters
----------
ref : `DatasetRef`
Dataset ref, its DataId must be in expanded form.
dataset_config : `DatasetTypeConfig`
The configuration for the dataset type.
Returns
-------
record : `dict` [ `str`, `typing.Any` ]
Record content with generic content filled in that does not
depend on a specific universe.
"""
record: dict[str, str | int | float | UUID | None] = {}

record["dataproduct_type"] = dataset_config.dataproduct_type
record["dataproduct_subtype"] = dataset_config.dataproduct_subtype
record["o_ucd"] = dataset_config.o_ucd
record["calib_level"] = dataset_config.calib_level
if dataset_config.obs_collection is not None:
record["obs_collection"] = dataset_config.obs_collection
else:
record["obs_collection"] = self.config.obs_collection
record["access_format"] = dataset_config.access_format

dataId = ref.dataId
dataset_type_name = ref.datasetType.name

# Dictionary to use for substitutions when formatting various
# strings from configuration.
fmt_kws: dict[str, Any] = dict(records=dataId.records)
fmt_kws.update(dataId.mapping)
fmt_kws.update(id=ref.id)
fmt_kws.update(run=ref.run)
fmt_kws.update(dataset_type=dataset_type_name)
fmt_kws.update(record)
if dataset_config.obs_id_fmt:
record["obs_id"] = dataset_config.obs_id_fmt.format(**fmt_kws)
fmt_kws["obs_id"] = record["obs_id"]

if dataset_config.datalink_url_fmt:
record["access_url"] = dataset_config.datalink_url_fmt.format(**fmt_kws)

extra_columns = {}
if self.config.extra_columns:
extra_columns.update(self.config.extra_columns)
if dataset_config.extra_columns:
extra_columns.update(dataset_config.extra_columns)
for key, column_value in extra_columns.items():
# Try to expand the template with known keys, if expansion
# fails due to a missing key name then store None.
if isinstance(column_value, ExtraColumnConfig):
try:
value = column_value.template.format(**fmt_kws)
record[key] = _TYPE_CONVERSION[column_value.type](value)
except KeyError:
pass
else:
# Just a static value.
record[key] = column_value

return record

@abstractmethod
def make_universe_records(self, ref: DatasetRef) -> Record:
"""Create universe-specific record content.
Parameters
----------
ref : `DatasetRef`
Dataset ref, its DataId must be in expanded form.
Returns
-------
record : `dict` [ `str`, `typing.Any` ]
Record content populated using algorithms specific to this
dimension universe.
"""
raise NotImplementedError()

def __call__(self, ref: DatasetRef) -> Record | None:
"""Make an ObsCore record from a dataset.
Parameters
----------
ref : `DatasetRef`
Dataset ref, its DataId must be in expanded form.
context : `SqlQueryContext`
Context used to execute queries for additional dimension metadata.
Returns
-------
Expand All @@ -150,7 +233,6 @@ def __call__(self, ref: DatasetRef) -> Record | None:
if dataset_config is None:
return None

dataId = ref.dataId
# _LOG.debug("New record, dataId=%s", dataId.full)
# _LOG.debug("New record, records=%s", dataId.records)

Expand All @@ -160,15 +242,127 @@ def __call__(self, ref: DatasetRef) -> Record | None:
# everything with None.
record = {field.name: None for field in self.schema.table_spec.fields}

record["dataproduct_type"] = dataset_config.dataproduct_type
record["dataproduct_subtype"] = dataset_config.dataproduct_subtype
record["o_ucd"] = dataset_config.o_ucd
record["calib_level"] = dataset_config.calib_level
if dataset_config.obs_collection is not None:
record["obs_collection"] = dataset_config.obs_collection
else:
record["obs_collection"] = self.config.obs_collection
record["access_format"] = dataset_config.access_format
record.update(self.make_generic_records(ref, dataset_config))
record.update(self.make_universe_records(ref))

return record

def make_spatial_records(self, region: Region | None, warn: bool = False, msg: str = "") -> Record:
"""Make spatial records for a given region.
Parameters
----------
region : `~lsst.sphgeom.Region` or `None`
Spacial region to convert to record.
warn : `bool`, optional
If `False`, an exception will be raised if the type of region
is not supported. If `True` a warning will be issued and an
empty `dict` returned.
msg : `str`, optional
Message to use in warning. Generic message will be used if not
given. This message will be used to annotate any `RegionTypeError`
exception raised if defined.
Returns
-------
record : `dict`
Record items.
Raises
------
RegionTypeError
Raised if type of the region is not supported and ``warn`` is
`False`.
"""
record = Record()
try:
# Ask each plugin for its values to add to a record.
for plugin in self.spatial_plugins:
plugin_record = plugin.make_records(region)
if plugin_record is not None:
record.update(plugin_record)
except RegionTypeError as exc:
if warn:
if not msg:
msg = "Failed to convert obscore region"
warnings.warn(
f"{msg}: {exc}",
category=RegionTypeWarning,
stacklevel=find_outside_stacklevel("lsst.daf.butler"),
)
# Clear the record.
record = Record()
else:
if msg:
exc.add_note(msg)
raise
return record

@classmethod
def get_record_type_from_universe(cls, universe: DimensionUniverse) -> type[RecordFactory]:
namespace = universe.namespace
if namespace == "daf_butler":
return DafButlerRecordFactory
# Check for entry points.
plugins = {p.name: p for p in entry_points(group="butler.obscore_factory")}
if namespace in plugins:
func = plugins[namespace].load()
# The entry point function is required to return the class that
# should be used for the RecordFactory for this universe.
record_factory_type = func()
if not issubclass(record_factory_type, RecordFactory):
raise ValueError(
f"Entry point for universe {namespace} did not return RecordFactory. "
f"Returned {type(record_factory_type)}"
)
return record_factory_type
raise ValueError(f"Unable to load record factory dynamically for universe namespace {namespace}")


class DafButlerRecordFactory(RecordFactory):
"""Class that implements conversion of dataset information to ObsCore
using the daf_butler dimension universe namespace.
Parameters
----------
config : `ObsCoreConfig`
Complete configuration specifying conversion options.
schema : `ObsCoreSchema`
Description of obscore schema.
universe : `DimensionUniverse`
Registry dimensions universe.
spatial_plugins : `~collections.abc.Collection` of `SpatialObsCorePlugin`
Spatial plugins.
exposure_region_factory : `ExposureRegionFactory`, optional
Manager for Registry dimensions.
"""

def __init__(
self,
config: ObsCoreConfig,
schema: ObsCoreSchema,
universe: DimensionUniverse,
spatial_plugins: Collection[SpatialObsCorePlugin],
exposure_region_factory: ExposureRegionFactory | None = None,
):
super().__init__(
config=config,
schema=schema,
universe=universe,
spatial_plugins=spatial_plugins,
exposure_region_factory=exposure_region_factory,
)

# All dimension elements used below.
self.band = cast(Dimension, universe["band"])
self.exposure = universe["exposure"]
self.visit = universe["visit"]
self.physical_filter = cast(Dimension, universe["physical_filter"])

def make_universe_records(self, ref: DatasetRef) -> Record:
# Construct records using the daf_butler dimension universe.
dataId = ref.dataId
record: dict[str, str | int | float | UUID | None] = {}

instrument_name = cast(str, dataId.get("instrument"))
record["instrument_name"] = instrument_name
Expand All @@ -195,17 +389,12 @@ def __call__(self, ref: DatasetRef) -> Record | None:
elif self.visit.name in dataId and (dimension_record := dataId.records[self.visit.name]) is not None:
self._visit_records(dimension_record, record)

# ask each plugin for its values to add to a record.
try:
plugin_records = self.make_spatial_records(region)
except RegionTypeError as exc:
warnings.warn(
f"Failed to convert region for obscore dataset {ref.id}: {exc}",
category=RegionTypeWarning,
stacklevel=find_outside_stacklevel("lsst.daf.butler"),
# Create spatial records.
record.update(
self.make_spatial_records(
region, warn=True, msg=f"Failed to convert region for obscore dataset {ref.id}"
)
else:
record.update(plugin_records)
)

if self.band.name in dataId:
em_range = None
Expand All @@ -221,66 +410,6 @@ def __call__(self, ref: DatasetRef) -> Record | None:
_LOG.warning("could not find spectral range for dataId=%s", dataId)
record["em_filter_name"] = dataId["band"]

# Dictionary to use for substitutions when formatting various
# strings.
fmt_kws: dict[str, Any] = dict(records=dataId.records)
fmt_kws.update(dataId.mapping)
fmt_kws.update(id=ref.id)
fmt_kws.update(run=ref.run)
fmt_kws.update(dataset_type=dataset_type_name)
fmt_kws.update(record)
if dataset_config.obs_id_fmt:
record["obs_id"] = dataset_config.obs_id_fmt.format(**fmt_kws)
fmt_kws["obs_id"] = record["obs_id"]

if dataset_config.datalink_url_fmt:
record["access_url"] = dataset_config.datalink_url_fmt.format(**fmt_kws)

# add extra columns
extra_columns = {}
if self.config.extra_columns:
extra_columns.update(self.config.extra_columns)
if dataset_config.extra_columns:
extra_columns.update(dataset_config.extra_columns)
for key, column_value in extra_columns.items():
# Try to expand the template with known keys, if expansion
# fails due to a missing key name then store None.
if isinstance(column_value, ExtraColumnConfig):
try:
value = column_value.template.format(**fmt_kws)
record[key] = _TYPE_CONVERSION[column_value.type](value)
except KeyError:
pass
else:
# Just a static value.
record[key] = column_value

return record

def make_spatial_records(self, region: Region | None) -> Record:
"""Make spatial records for a given region.
Parameters
----------
region : `~lsst.sphgeom.Region` or `None`
Spacial region to convert to record.
Returns
-------
record : `dict`
Record items.
Raises
------
RegionTypeError
Raised if type of the region is not supported.
"""
record = Record()
# ask each plugin for its values to add to a record.
for plugin in self.spatial_plugins:
plugin_record = plugin.make_records(region)
if plugin_record is not None:
record.update(plugin_record)
return record

def _exposure_records(self, dimension_record: DimensionRecord, record: dict[str, Any]) -> None:
Expand Down

0 comments on commit f0de6ed

Please sign in to comment.