diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5d6d4fdd..c9f5ea8d 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -7,7 +7,7 @@ repos: - id: trailing-whitespace - id: check-toml - repo: https://github.com/psf/black-pre-commit-mirror - rev: 24.10.0 + rev: 25.1.0 hooks: - id: black # It is recommended to specify the latest version of Python @@ -16,13 +16,13 @@ repos: # https://pre-commit.com/#top_level-default_language_version language_version: python3.11 - repo: https://github.com/pycqa/isort - rev: 5.13.2 + rev: 6.0.0 hooks: - id: isort name: isort (python) - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.7.4 + rev: v0.9.3 hooks: - id: ruff - repo: https://github.com/numpy/numpydoc diff --git a/doc/changes/DM-35396.api.rst b/doc/changes/DM-35396.api.rst new file mode 100644 index 00000000..c3d3a870 --- /dev/null +++ b/doc/changes/DM-35396.api.rst @@ -0,0 +1 @@ +The Quantum ID is now passed through the executors so it can be recorded in the provenance by ``QuantumContext``. diff --git a/doc/changes/DM-35396.feature.rst b/doc/changes/DM-35396.feature.rst new file mode 100644 index 00000000..15dcf11b --- /dev/null +++ b/doc/changes/DM-35396.feature.rst @@ -0,0 +1,2 @@ +* Quantum metadata outputs now record the IDs of all output datasets. + This is stored in an ``outputs`` key. diff --git a/python/lsst/ctrl/mpexec/cmdLineFwk.py b/python/lsst/ctrl/mpexec/cmdLineFwk.py index c02706f8..d695774b 100644 --- a/python/lsst/ctrl/mpexec/cmdLineFwk.py +++ b/python/lsst/ctrl/mpexec/cmdLineFwk.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Module defining CmdLineFwk class and related methods. -""" +"""Module defining CmdLineFwk class and related methods.""" from __future__ import annotations diff --git a/python/lsst/ctrl/mpexec/mpGraphExecutor.py b/python/lsst/ctrl/mpexec/mpGraphExecutor.py index 0a918cc3..57f965c0 100644 --- a/python/lsst/ctrl/mpexec/mpGraphExecutor.py +++ b/python/lsst/ctrl/mpexec/mpGraphExecutor.py @@ -37,6 +37,7 @@ import sys import threading import time +import uuid from collections.abc import Iterable from enum import Enum from typing import Literal @@ -124,7 +125,15 @@ def start( mp_ctx = multiprocessing.get_context(startMethod) self.process = mp_ctx.Process( # type: ignore[attr-defined] target=_Job._executeJob, - args=(qe_pickle, task_node_pickle, quantum_pickle, logConfigState, snd_conn, self._fail_fast), + args=( + qe_pickle, + task_node_pickle, + quantum_pickle, + self.qnode.nodeId, + logConfigState, + snd_conn, + self._fail_fast, + ), name=f"task-{self.qnode.quantum.dataId}", ) # mypy is getting confused by multiprocessing. @@ -138,6 +147,7 @@ def _executeJob( quantumExecutor_pickle: bytes, task_node_pickle: bytes, quantum_pickle: bytes, + quantum_id: uuid.UUID | None, logConfigState: list, snd_conn: multiprocessing.connection.Connection, fail_fast: bool, @@ -180,7 +190,7 @@ def _executeJob( # Catch a few known failure modes and stop the process immediately, # with exception-specific exit code. try: - _, report = quantumExecutor.execute(task_node, quantum) + _, report = quantumExecutor.execute(task_node, quantum, quantum_id=quantum_id) except RepeatableQuantumError as exc: report = QuantumReport.from_exception( exception=exc, @@ -530,7 +540,9 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None: # exception-specific exit code, but we still want to start # debugger before exiting if debugging is enabled. try: - _, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum) + _, quantum_report = self.quantumExecutor.execute( + task_node, qnode.quantum, quantum_id=qnode.nodeId + ) if quantum_report: report.quantaReports.append(quantum_report) successCount += 1 diff --git a/python/lsst/ctrl/mpexec/quantumGraphExecutor.py b/python/lsst/ctrl/mpexec/quantumGraphExecutor.py index d53302c9..f5f48958 100644 --- a/python/lsst/ctrl/mpexec/quantumGraphExecutor.py +++ b/python/lsst/ctrl/mpexec/quantumGraphExecutor.py @@ -35,6 +35,8 @@ from .reports import QuantumReport, Report if TYPE_CHECKING: + import uuid + from lsst.daf.butler import Quantum from lsst.pipe.base import QuantumGraph from lsst.pipe.base.pipeline_graph import TaskNode @@ -50,7 +52,9 @@ class QuantumExecutor(ABC): """ @abstractmethod - def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, QuantumReport | None]: + def execute( + self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None + ) -> tuple[Quantum, QuantumReport | None]: """Execute single quantum. Parameters @@ -59,6 +63,8 @@ def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, Qu Task definition structure. quantum : `~lsst.daf.butler.Quantum` Quantum for this execution. + quantum_id : `uuid.UUID` or `None`, optional + The ID of the quantum to be executed. Returns ------- diff --git a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py index d0361894..356715b7 100644 --- a/python/lsst/ctrl/mpexec/singleQuantumExecutor.py +++ b/python/lsst/ctrl/mpexec/singleQuantumExecutor.py @@ -32,6 +32,7 @@ # ------------------------------- import logging import time +import uuid from collections import defaultdict from collections.abc import Callable from itertools import chain @@ -168,18 +169,22 @@ def __init__( collectionTypes=CollectionType.RUN, ) - def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, QuantumReport | None]: + def execute( + self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None + ) -> tuple[Quantum, QuantumReport | None]: # Docstring inherited from QuantumExecutor.execute assert quantum.dataId is not None, "Quantum DataId cannot be None" if self.butler is not None: self.butler.registry.refresh() - result = self._execute(task_node, quantum) + result = self._execute(task_node, quantum, quantum_id=quantum_id) report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label) return result, report - def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum: + def _execute( + self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None + ) -> Quantum: """Execute the quantum. Internal implementation of `execute()`. @@ -268,7 +273,9 @@ def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum: task = self.taskFactory.makeTask(task_node, limited_butler, init_input_refs) logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type] try: - quantumMetadata["caveats"] = self.runQuantum(task, quantum, task_node, limited_butler).value + caveats, outputsPut = self.runQuantum( + task, quantum, task_node, limited_butler, quantum_id=quantum_id + ) except Exception as e: _LOG.error( "Execution of task '%s' on quantum %s failed. Exception %s: %s", @@ -278,6 +285,11 @@ def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum: str(e), ) raise + else: + quantumMetadata["caveats"] = caveats.value + # Stringify the UUID for easier compatibility with + # PropertyList. + quantumMetadata["outputs"] = [str(output) for output in outputsPut] logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type] fullMetadata = task.getFullMetadata() fullMetadata["quantum"] = quantumMetadata @@ -475,7 +487,8 @@ def runQuantum( task_node: TaskNode, /, limited_butler: LimitedButler, - ) -> QuantumSuccessCaveats: + quantum_id: uuid.UUID | None = None, + ) -> tuple[QuantumSuccessCaveats, list[uuid.UUID]]: """Execute task on a single quantum. Parameters @@ -488,16 +501,21 @@ def runQuantum( Task definition structure. limited_butler : `~lsst.daf.butler.LimitedButler` Butler to use for dataset I/O. + quantum_id : `uuid.UUID` or `None`, optional + ID of the quantum being executed. Returns ------- flags : `QuantumSuccessCaveats` Flags that describe qualified successes. + ids_put : list[ `uuid.UUID` ] + Record of all the dataset IDs that were written by this quantum + being executed. """ flags = QuantumSuccessCaveats.NO_CAVEATS # Create a butler that operates in the context of a quantum - butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources) + butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources, quantum_id=quantum_id) # Get the input and output references for the task inputRefs, outputRefs = task_node.get_connections().buildDatasetRefs(quantum) @@ -546,7 +564,8 @@ def runQuantum( flags |= QuantumSuccessCaveats.ALL_OUTPUTS_MISSING if not butlerQC.outputsPut == butlerQC.allOutputs: flags |= QuantumSuccessCaveats.ANY_OUTPUTS_MISSING - return flags + ids_put = [output[2] for output in butlerQC.outputsPut] + return flags, ids_put def writeMetadata( self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler diff --git a/python/lsst/ctrl/mpexec/util.py b/python/lsst/ctrl/mpexec/util.py index 8559367c..c9ca65bc 100644 --- a/python/lsst/ctrl/mpexec/util.py +++ b/python/lsst/ctrl/mpexec/util.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Few utility methods used by the rest of a package. -""" +"""Few utility methods used by the rest of a package.""" __all__ = ["printTable", "filterTaskNodes", "subTaskIter"] diff --git a/tests/test_cliCmdCleanup.py b/tests/test_cliCmdCleanup.py index 39f97375..c0908866 100644 --- a/tests/test_cliCmdCleanup.py +++ b/tests/test_cliCmdCleanup.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Unit tests for ctrl_mpexec CLI cleanup subcommand. -""" +"""Unit tests for ctrl_mpexec CLI cleanup subcommand.""" import os diff --git a/tests/test_cliCmdPurge.py b/tests/test_cliCmdPurge.py index 22592ead..ecd1f4c7 100644 --- a/tests/test_cliCmdPurge.py +++ b/tests/test_cliCmdPurge.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Unit tests for ctrl_mpexec CLI purge subcommand. -""" +"""Unit tests for ctrl_mpexec CLI purge subcommand.""" import os diff --git a/tests/test_cliUtils.py b/tests/test_cliUtils.py index 2ba23524..901af334 100644 --- a/tests/test_cliUtils.py +++ b/tests/test_cliUtils.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Unit tests for the daf_butler shared CLI options. -""" +"""Unit tests for the daf_butler shared CLI options.""" import unittest diff --git a/tests/test_cmdLineFwk.py b/tests/test_cmdLineFwk.py index 03b4a06d..84d079c0 100644 --- a/tests/test_cmdLineFwk.py +++ b/tests/test_cmdLineFwk.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Simple unit test for cmdLineFwk module. -""" +"""Simple unit test for cmdLineFwk module.""" import contextlib import logging diff --git a/tests/test_executors.py b/tests/test_executors.py index f301286b..67d41418 100644 --- a/tests/test_executors.py +++ b/tests/test_executors.py @@ -59,6 +59,7 @@ from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph if TYPE_CHECKING: + import uuid from collections.abc import Iterator from multiprocessing.managers import ListProxy @@ -88,7 +89,11 @@ def __init__(self, mp: bool = False): self.quanta = manager.list() def execute( # type: ignore[override] - self, task_node: TaskNodeMock, /, quantum: QuantumMock # type: ignore[override] + self, + task_node: TaskNodeMock, + /, + quantum: QuantumMock, # type: ignore[override] + quantum_id: uuid.UUID | None = None, ) -> tuple[QuantumMock, QuantumReport | None]: _LOG.debug("QuantumExecutorMock.execute: task_node=%s dataId=%s", task_node, quantum.dataId) self._execute_called = True diff --git a/tests/test_preExecInit.py b/tests/test_preExecInit.py index 27f0d7d2..94d6b3a3 100644 --- a/tests/test_preExecInit.py +++ b/tests/test_preExecInit.py @@ -25,8 +25,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -"""Simple unit test for PreExecInit class. -""" +"""Simple unit test for PreExecInit class.""" import contextlib import shutil