Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-35396: Pass quantum ID down to executor so it can be used in provenance #321

Merged
merged 5 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ repos:
- id: trailing-whitespace
- id: check-toml
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 24.10.0
rev: 25.1.0
hooks:
- id: black
# It is recommended to specify the latest version of Python
Expand All @@ -16,13 +16,13 @@ repos:
# https://pre-commit.com/#top_level-default_language_version
language_version: python3.11
- repo: https://github.com/pycqa/isort
rev: 5.13.2
rev: 6.0.0
hooks:
- id: isort
name: isort (python)
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.7.4
rev: v0.9.3
hooks:
- id: ruff
- repo: https://github.com/numpy/numpydoc
Expand Down
1 change: 1 addition & 0 deletions doc/changes/DM-35396.api.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The Quantum ID is now passed through the executors so it can be recorded in the provenance by ``QuantumContext``.
2 changes: 2 additions & 0 deletions doc/changes/DM-35396.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
* Quantum metadata outputs now record the IDs of all output datasets.
This is stored in an ``outputs`` key.
3 changes: 1 addition & 2 deletions python/lsst/ctrl/mpexec/cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Module defining CmdLineFwk class and related methods.
"""
"""Module defining CmdLineFwk class and related methods."""

from __future__ import annotations

Expand Down
18 changes: 15 additions & 3 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import sys
import threading
import time
import uuid
from collections.abc import Iterable
from enum import Enum
from typing import Literal
Expand Down Expand Up @@ -124,7 +125,15 @@ def start(
mp_ctx = multiprocessing.get_context(startMethod)
self.process = mp_ctx.Process( # type: ignore[attr-defined]
target=_Job._executeJob,
args=(qe_pickle, task_node_pickle, quantum_pickle, logConfigState, snd_conn, self._fail_fast),
args=(
qe_pickle,
task_node_pickle,
quantum_pickle,
self.qnode.nodeId,
logConfigState,
snd_conn,
self._fail_fast,
),
name=f"task-{self.qnode.quantum.dataId}",
)
# mypy is getting confused by multiprocessing.
Expand All @@ -138,6 +147,7 @@ def _executeJob(
quantumExecutor_pickle: bytes,
task_node_pickle: bytes,
quantum_pickle: bytes,
quantum_id: uuid.UUID | None,
logConfigState: list,
snd_conn: multiprocessing.connection.Connection,
fail_fast: bool,
Expand Down Expand Up @@ -180,7 +190,7 @@ def _executeJob(
# Catch a few known failure modes and stop the process immediately,
# with exception-specific exit code.
try:
_, report = quantumExecutor.execute(task_node, quantum)
_, report = quantumExecutor.execute(task_node, quantum, quantum_id=quantum_id)
except RepeatableQuantumError as exc:
report = QuantumReport.from_exception(
exception=exc,
Expand Down Expand Up @@ -530,7 +540,9 @@ def _executeQuantaInProcess(self, graph: QuantumGraph, report: Report) -> None:
# exception-specific exit code, but we still want to start
# debugger before exiting if debugging is enabled.
try:
_, quantum_report = self.quantumExecutor.execute(task_node, qnode.quantum)
_, quantum_report = self.quantumExecutor.execute(
task_node, qnode.quantum, quantum_id=qnode.nodeId
)
if quantum_report:
report.quantaReports.append(quantum_report)
successCount += 1
Expand Down
8 changes: 7 additions & 1 deletion python/lsst/ctrl/mpexec/quantumGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
from .reports import QuantumReport, Report

if TYPE_CHECKING:
import uuid

from lsst.daf.butler import Quantum
from lsst.pipe.base import QuantumGraph
from lsst.pipe.base.pipeline_graph import TaskNode
Expand All @@ -50,7 +52,9 @@ class QuantumExecutor(ABC):
"""

@abstractmethod
def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, QuantumReport | None]:
def execute(
self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None
) -> tuple[Quantum, QuantumReport | None]:
"""Execute single quantum.

Parameters
Expand All @@ -59,6 +63,8 @@ def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, Qu
Task definition structure.
quantum : `~lsst.daf.butler.Quantum`
Quantum for this execution.
quantum_id : `uuid.UUID` or `None`, optional
The ID of the quantum to be executed.

Returns
-------
Expand Down
33 changes: 26 additions & 7 deletions python/lsst/ctrl/mpexec/singleQuantumExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
# -------------------------------
import logging
import time
import uuid
from collections import defaultdict
from collections.abc import Callable
from itertools import chain
Expand Down Expand Up @@ -168,18 +169,22 @@ def __init__(
collectionTypes=CollectionType.RUN,
)

def execute(self, task_node: TaskNode, /, quantum: Quantum) -> tuple[Quantum, QuantumReport | None]:
def execute(
self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None
) -> tuple[Quantum, QuantumReport | None]:
# Docstring inherited from QuantumExecutor.execute
assert quantum.dataId is not None, "Quantum DataId cannot be None"

if self.butler is not None:
self.butler.registry.refresh()

result = self._execute(task_node, quantum)
result = self._execute(task_node, quantum, quantum_id=quantum_id)
report = QuantumReport(dataId=quantum.dataId, taskLabel=task_node.label)
return result, report

def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
def _execute(
self, task_node: TaskNode, /, quantum: Quantum, quantum_id: uuid.UUID | None = None
) -> Quantum:
"""Execute the quantum.

Internal implementation of `execute()`.
Expand Down Expand Up @@ -268,7 +273,9 @@ def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
task = self.taskFactory.makeTask(task_node, limited_butler, init_input_refs)
logInfo(None, "start", metadata=quantumMetadata) # type: ignore[arg-type]
try:
quantumMetadata["caveats"] = self.runQuantum(task, quantum, task_node, limited_butler).value
caveats, outputsPut = self.runQuantum(
task, quantum, task_node, limited_butler, quantum_id=quantum_id
)
except Exception as e:
_LOG.error(
"Execution of task '%s' on quantum %s failed. Exception %s: %s",
Expand All @@ -278,6 +285,11 @@ def _execute(self, task_node: TaskNode, /, quantum: Quantum) -> Quantum:
str(e),
)
raise
else:
quantumMetadata["caveats"] = caveats.value
# Stringify the UUID for easier compatibility with
# PropertyList.
quantumMetadata["outputs"] = [str(output) for output in outputsPut]
logInfo(None, "end", metadata=quantumMetadata) # type: ignore[arg-type]
fullMetadata = task.getFullMetadata()
fullMetadata["quantum"] = quantumMetadata
Expand Down Expand Up @@ -475,7 +487,8 @@ def runQuantum(
task_node: TaskNode,
/,
limited_butler: LimitedButler,
) -> QuantumSuccessCaveats:
quantum_id: uuid.UUID | None = None,
) -> tuple[QuantumSuccessCaveats, list[uuid.UUID]]:
"""Execute task on a single quantum.

Parameters
Expand All @@ -488,16 +501,21 @@ def runQuantum(
Task definition structure.
limited_butler : `~lsst.daf.butler.LimitedButler`
Butler to use for dataset I/O.
quantum_id : `uuid.UUID` or `None`, optional
ID of the quantum being executed.

Returns
-------
flags : `QuantumSuccessCaveats`
Flags that describe qualified successes.
ids_put : list[ `uuid.UUID` ]
Record of all the dataset IDs that were written by this quantum
being executed.
"""
flags = QuantumSuccessCaveats.NO_CAVEATS

# Create a butler that operates in the context of a quantum
butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources)
butlerQC = QuantumContext(limited_butler, quantum, resources=self.resources, quantum_id=quantum_id)

# Get the input and output references for the task
inputRefs, outputRefs = task_node.get_connections().buildDatasetRefs(quantum)
Expand Down Expand Up @@ -546,7 +564,8 @@ def runQuantum(
flags |= QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
if not butlerQC.outputsPut == butlerQC.allOutputs:
flags |= QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
return flags
ids_put = [output[2] for output in butlerQC.outputsPut]
return flags, ids_put

def writeMetadata(
self, quantum: Quantum, metadata: Any, task_node: TaskNode, /, limited_butler: LimitedButler
Expand Down
3 changes: 1 addition & 2 deletions python/lsst/ctrl/mpexec/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Few utility methods used by the rest of a package.
"""
"""Few utility methods used by the rest of a package."""

__all__ = ["printTable", "filterTaskNodes", "subTaskIter"]

Expand Down
3 changes: 1 addition & 2 deletions tests/test_cliCmdCleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Unit tests for ctrl_mpexec CLI cleanup subcommand.
"""
"""Unit tests for ctrl_mpexec CLI cleanup subcommand."""


import os
Expand Down
3 changes: 1 addition & 2 deletions tests/test_cliCmdPurge.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Unit tests for ctrl_mpexec CLI purge subcommand.
"""
"""Unit tests for ctrl_mpexec CLI purge subcommand."""


import os
Expand Down
3 changes: 1 addition & 2 deletions tests/test_cliUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""Unit tests for the daf_butler shared CLI options.
"""
"""Unit tests for the daf_butler shared CLI options."""

import unittest

Expand Down
3 changes: 1 addition & 2 deletions tests/test_cmdLineFwk.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Simple unit test for cmdLineFwk module.
"""
"""Simple unit test for cmdLineFwk module."""

import contextlib
import logging
Expand Down
7 changes: 6 additions & 1 deletion tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from lsst.pipe.base.tests.simpleQGraph import AddTaskFactoryMock, makeSimpleQGraph

if TYPE_CHECKING:
import uuid
from collections.abc import Iterator
from multiprocessing.managers import ListProxy

Expand Down Expand Up @@ -88,7 +89,11 @@ def __init__(self, mp: bool = False):
self.quanta = manager.list()

def execute( # type: ignore[override]
self, task_node: TaskNodeMock, /, quantum: QuantumMock # type: ignore[override]
self,
task_node: TaskNodeMock,
/,
quantum: QuantumMock, # type: ignore[override]
quantum_id: uuid.UUID | None = None,
) -> tuple[QuantumMock, QuantumReport | None]:
_LOG.debug("QuantumExecutorMock.execute: task_node=%s dataId=%s", task_node, quantum.dataId)
self._execute_called = True
Expand Down
3 changes: 1 addition & 2 deletions tests/test_preExecInit.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

"""Simple unit test for PreExecInit class.
"""
"""Simple unit test for PreExecInit class."""

import contextlib
import shutil
Expand Down
Loading