diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 5d6d4fdd..c9f5ea8d 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -7,7 +7,7 @@ repos:
- id: trailing-whitespace
- id: check-toml
- repo: https://github.com/psf/black-pre-commit-mirror
- rev: 24.10.0
+ rev: 25.1.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
@@ -16,13 +16,13 @@ repos:
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.11
- repo: https://github.com/pycqa/isort
- rev: 5.13.2
+ rev: 6.0.0
hooks:
- id: isort
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
- rev: v0.7.4
+ rev: v0.9.3
hooks:
- id: ruff
- repo: https://github.com/numpy/numpydoc
diff --git a/doc/changes/DM-35396.api.rst b/doc/changes/DM-35396.api.rst
new file mode 100644
index 00000000..c3d3a870
--- /dev/null
+++ b/doc/changes/DM-35396.api.rst
@@ -0,0 +1 @@
+The Quantum ID is now passed through the executors so it can be recorded in the provenance by ``QuantumContext``.
diff --git a/doc/changes/DM-35396.feature.rst b/doc/changes/DM-35396.feature.rst
new file mode 100644
index 00000000..15dcf11b
--- /dev/null
+++ b/doc/changes/DM-35396.feature.rst
@@ -0,0 +1,2 @@
+* Quantum metadata outputs now record the IDs of all output datasets.
+ This is stored in an ``outputs`` key.
diff --git a/python/lsst/ctrl/mpexec/cmdLineFwk.py b/python/lsst/ctrl/mpexec/cmdLineFwk.py
index c02706f8..d695774b 100644
--- a/python/lsst/ctrl/mpexec/cmdLineFwk.py
+++ b/python/lsst/ctrl/mpexec/cmdLineFwk.py
@@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Module defining CmdLineFwk class and related methods.
-"""
+"""Module defining CmdLineFwk class and related methods."""
from __future__ import annotations
diff --git a/python/lsst/ctrl/mpexec/mpGraphExecutor.py b/python/lsst/ctrl/mpexec/mpGraphExecutor.py
index 0a918cc3..57f965c0 100644
--- a/python/lsst/ctrl/mpexec/mpGraphExecutor.py
+++ b/python/lsst/ctrl/mpexec/mpGraphExecutor.py
@@ -37,6 +37,7 @@
import sys
import threading
import time
+import uuid
from collections.abc import Iterable
from enum import Enum
from typing import Literal
@@ -124,7 +125,15 @@ def start(
mp_ctx = multiprocessing.get_context(startMethod)
self.process = mp_ctx.Process( # type: ignore[attr-defined]
target=_Job._executeJob,
- args=(qe_pickle, task_node_pickle, quantum_pickle, logConfigState, snd_conn, self._fail_fast),
+ args=(
+ qe_pickle,
+ task_node_pickle,
+ quantum_pickle,
+ self.qnode.nodeId,
+ logConfigState,
+ snd_conn,
+ self._fail_fast,
+ ),
name=f"task-{self.qnode.quantum.dataId}",
)
# mypy is getting confused by multiprocessing.
@@ -138,6 +147,7 @@ def _executeJob(
quantumExecutor_pickle: bytes,
task_node_pickle: bytes,
quantum_pickle: bytes,
+ quantum_id: uuid.UUID | None,
logConfigState: list,
snd_conn: multiprocessing.connection.Connection,
fail_fast: bool,
@@ -180,7 +190,7 @@ def _executeJob(
# Catch a few known failure modes and stop the process immediately,
# with exception-specific exit code.
try:
- _, report = quantumExecutor.execute(task_node, quantum)
+ _, report = quantumExecutor.execute(task_node, quantum, quantum_id=quantum_id)
except RepeatableQuantumError as exc:
report = QuantumReport.from_exception(
exception=exc,
@@ -530,7 +540,9 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None:
# exception-specific exit code, but we still want to start
# debugger before exiting if debugging is enabled.
try:
- _, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum)
+ _, quantum_report = self.quantumExecutor.execute(
+ task_node, qnode.quantum, quantum_id=qnode.nodeId
+ )
if quantum_report:
report.quantaReports.append(quantum_report)
successCount += 1
diff --git a/python/lsst/ctrl/mpexec/quantumGraphExecutor.py b/python/lsst/ctrl/mpexec/quantumGraphExecutor.py
index d53302c9..f5f48958 100644
--- a/python/lsst/ctrl/mpexec/quantumGraphExecutor.py
+++ b/python/lsst/ctrl/mpexec/quantumGraphExecutor.py
@@ -35,6 +35,8 @@
from .reports import QuantumReport, Report
if TYPE_CHECKING:
+ import uuid
+
from lsst.daf.butler import Quantum
from lsst.pipe.base import QuantumGraph
from lsst.pipe.base.pipeline_graph import TaskNode
@@ -50,7 +52,9 @@ class QuantumExecutor(ABC):
"""
@abstractmethod
- def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, QuantumReport | None]:
+ def execute(
+ self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None
+ ) -> tuple[Quantum, QuantumReport | None]:
"""Execute single quantum.
Parameters
@@ -59,6 +63,8 @@ def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, Qu
Task definition structure.
quantum : `~lsst.daf.butler.Quantum`
Quantum for this execution.
+ quantum_id : `uuid.UUID` or `None`, optional
+ The ID of the quantum to be executed.
Returns
-------
diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py
index d0361894..356715b7 100644
--- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py
+++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py
@@ -32,6 +32,7 @@
# -------------------------------
import logging
import time
+import uuid
from collections import defaultdict
from collections.abc import Callable
from itertools import chain
@@ -168,18 +169,22 @@ def __init__(
collectionTypes=CollectionType.RUN,
)
- def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, QuantumReport | None]:
+ def execute(
+ self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None
+ ) -> tuple[Quantum, QuantumReport | None]:
# Docstring inherited from QuantumExecutor.execute
assert quantum.dataId is not None, "Quantum DataId cannot be None"
if self.butler is not None:
self.butler.registry.refresh()
- result = self._execute(task_node, quantum)
+ result = self._execute(task_node, quantum, quantum_id=quantum_id)
report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label)
return result, report
- def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
+ def _execute(
+ self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None
+ ) -> Quantum:
"""Execute the quantum.
Internal implementation of `execute()`.
@@ -268,7 +273,9 @@ def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
task = self.taskFactory.makeTask(task_node, limited_butler, init_input_refs)
logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type]
try:
- quantumMetadata["caveats"] = self.runQuantum(task, quantum, task_node, limited_butler).value
+ caveats, outputsPut = self.runQuantum(
+ task, quantum, task_node, limited_butler, quantum_id=quantum_id
+ )
except Exception as e:
_LOG.error(
"Execution of task '%s' on quantum %s failed. Exception %s: %s",
@@ -278,6 +285,11 @@ def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
str(e),
)
raise
+ else:
+ quantumMetadata["caveats"] = caveats.value
+ # Stringify the UUID for easier compatibility with
+ # PropertyList.
+ quantumMetadata["outputs"] = [str(output) for output in outputsPut]
logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type]
fullMetadata = task.getFullMetadata()
fullMetadata["quantum"] = quantumMetadata
@@ -475,7 +487,8 @@ def runQuantum(
task_node: TaskNode,
/,
limited_butler: LimitedButler,
- ) -> QuantumSuccessCaveats:
+ quantum_id: uuid.UUID | None = None,
+ ) -> tuple[QuantumSuccessCaveats, list[uuid.UUID]]:
"""Execute task on a single quantum.
Parameters
@@ -488,16 +501,21 @@ def runQuantum(
Task definition structure.
limited_butler : `~lsst.daf.butler.LimitedButler`
Butler to use for dataset I/O.
+ quantum_id : `uuid.UUID` or `None`, optional
+ ID of the quantum being executed.
Returns
-------
flags : `QuantumSuccessCaveats`
Flags that describe qualified successes.
+ ids_put : list[ `uuid.UUID` ]
+ Record of all the dataset IDs that were written by this quantum
+ being executed.
"""
flags = QuantumSuccessCaveats.NO_CAVEATS
# Create a butler that operates in the context of a quantum
- butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources)
+ butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources, quantum_id=quantum_id)
# Get the input and output references for the task
inputRefs, outputRefs = task_node.get_connections().buildDatasetRefs(quantum)
@@ -546,7 +564,8 @@ def runQuantum(
flags |= QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
if not butlerQC.outputsPut == butlerQC.allOutputs:
flags |= QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
- return flags
+ ids_put = [output[2] for output in butlerQC.outputsPut]
+ return flags, ids_put
def writeMetadata(
self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler
diff --git a/python/lsst/ctrl/mpexec/util.py b/python/lsst/ctrl/mpexec/util.py
index 8559367c..c9ca65bc 100644
--- a/python/lsst/ctrl/mpexec/util.py
+++ b/python/lsst/ctrl/mpexec/util.py
@@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Few utility methods used by the rest of a package.
-"""
+"""Few utility methods used by the rest of a package."""
__all__ = ["printTable", "filterTaskNodes", "subTaskIter"]
diff --git a/tests/test_cliCmdCleanup.py b/tests/test_cliCmdCleanup.py
index 39f97375..c0908866 100644
--- a/tests/test_cliCmdCleanup.py
+++ b/tests/test_cliCmdCleanup.py
@@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Unit tests for ctrl_mpexec CLI cleanup subcommand.
-"""
+"""Unit tests for ctrl_mpexec CLI cleanup subcommand."""
import os
diff --git a/tests/test_cliCmdPurge.py b/tests/test_cliCmdPurge.py
index 22592ead..ecd1f4c7 100644
--- a/tests/test_cliCmdPurge.py
+++ b/tests/test_cliCmdPurge.py
@@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Unit tests for ctrl_mpexec CLI purge subcommand.
-"""
+"""Unit tests for ctrl_mpexec CLI purge subcommand."""
import os
diff --git a/tests/test_cliUtils.py b/tests/test_cliUtils.py
index 2ba23524..901af334 100644
--- a/tests/test_cliUtils.py
+++ b/tests/test_cliUtils.py
@@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Unit tests for the daf_butler shared CLI options.
-"""
+"""Unit tests for the daf_butler shared CLI options."""
import unittest
diff --git a/tests/test_cmdLineFwk.py b/tests/test_cmdLineFwk.py
index 03b4a06d..84d079c0 100644
--- a/tests/test_cmdLineFwk.py
+++ b/tests/test_cmdLineFwk.py
@@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Simple unit test for cmdLineFwk module.
-"""
+"""Simple unit test for cmdLineFwk module."""
import contextlib
import logging
diff --git a/tests/test_executors.py b/tests/test_executors.py
index f301286b..67d41418 100644
--- a/tests/test_executors.py
+++ b/tests/test_executors.py
@@ -59,6 +59,7 @@
from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph
if TYPE_CHECKING:
+ import uuid
from collections.abc import Iterator
from multiprocessing.managers import ListProxy
@@ -88,7 +89,11 @@ def __init__(self, mp: bool = False):
self.quanta = manager.list()
def execute( # type: ignore[override]
- self, task_node: TaskNodeMock, /, quantum: QuantumMock # type: ignore[override]
+ self,
+ task_node: TaskNodeMock,
+ /,
+ quantum: QuantumMock, # type: ignore[override]
+ quantum_id: uuid.UUID | None = None,
) -> tuple[QuantumMock, QuantumReport | None]:
_LOG.debug("QuantumExecutorMock.execute: task_node=%s dataId=%s", task_node, quantum.dataId)
self._execute_called = True
diff --git a/tests/test_preExecInit.py b/tests/test_preExecInit.py
index 27f0d7d2..94d6b3a3 100644
--- a/tests/test_preExecInit.py
+++ b/tests/test_preExecInit.py
@@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-"""Simple unit test for PreExecInit class.
-"""
+"""Simple unit test for PreExecInit class."""
import contextlib
import shutil