Skip to content

Commit

Permalink
Merge pull request #322 from lsst/tickets/DM-48536
Browse files Browse the repository at this point in the history
DM-48536: improvements and testing for `pipetask report`
  • Loading branch information
TallJimbo authored Feb 9, 2025
2 parents 0f3ec9b + 78a2d0d commit 1164789
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 136 deletions.
138 changes: 9 additions & 129 deletions python/lsst/ctrl/mpexec/cli/script/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from astropy.table import Table

from lsst.daf.butler import Butler
from lsst.pipe.base import QuantumGraph, QuantumSuccessCaveats
from lsst.pipe.base import QuantumGraph
from lsst.pipe.base.execution_reports import QuantumGraphExecutionReport
from lsst.pipe.base.quantum_provenance_graph import QuantumProvenanceGraph, Summary

Expand Down Expand Up @@ -235,137 +235,17 @@ def print_summary(summary: Summary, full_output_filename: str | None, brief: boo
Parameters
----------
summary : `QuantumProvenanceGraph.Summary`
This `Pydantic` model contains all the information derived from the
`QuantumProvenanceGraph`.
This `Pydantic` model contains all the information derived from the
`QuantumProvenanceGraph`.
full_output_filename : `str | None`
Name of the JSON file in which to store summary information, if
passed.
Name of the JSON file in which to store summary information, if
passed.
brief : `bool`
Only display short (counts-only) summary on stdout. This includes
counts and not error messages or data_ids (similar to BPS report).
This option will still report all `cursed` datasets and `wonky`
quanta.
Only display short (counts-only) summary on stdout. This includes
counts and not error messages or data_ids (similar to BPS report).
Ignored (considered `False`) if ``full_output_filename`` is passed.
"""
quanta_table = []
failed_quanta_table = []
wonky_quanta_table = []
for label, task_summary in summary.tasks.items():
if task_summary.n_wonky > 0:
print(
f"{label} has produced wonky quanta. Recommend processing cease until the issue is resolved."
)
for quantum_summary in task_summary.wonky_quanta:
wonky_quanta_table.append(
{
"Task": label,
"Data ID": quantum_summary.data_id,
"Runs and Status": quantum_summary.runs,
"Messages": quantum_summary.messages,
}
)
if len(task_summary.caveats) > 1:
caveats = "(multiple)"
elif len(task_summary.caveats) == 1:
((code, data_ids),) = task_summary.caveats.items()
caveats = f"{code}({len(data_ids)})"
else:
caveats = ""
quanta_table.append(
{
"Task": label,
"Unknown": task_summary.n_unknown,
"Successful": task_summary.n_successful,
"Caveats": caveats,
"Blocked": task_summary.n_blocked,
"Failed": task_summary.n_failed,
"Wonky": task_summary.n_wonky,
"TOTAL": sum(
[
task_summary.n_successful,
task_summary.n_unknown,
task_summary.n_blocked,
task_summary.n_failed,
task_summary.n_wonky,
]
),
"EXPECTED": task_summary.n_expected,
}
)
if task_summary.failed_quanta:
for quantum_summary in task_summary.failed_quanta:
failed_quanta_table.append(
{
"Task": label,
"Data ID": quantum_summary.data_id,
"Runs and Status": quantum_summary.runs,
"Messages": quantum_summary.messages,
}
)
quanta = Table(quanta_table)
quanta.pprint_all()
print("")
print("Caveat codes:")
for k, v in QuantumSuccessCaveats.legend().items():
print(f"{k}: {v}")
print("")
# Dataset loop
dataset_table = []
cursed_datasets = []
unsuccessful_datasets = {}
for dataset_type_name, dataset_type_summary in summary.datasets.items():
dataset_table.append(
{
"Dataset": dataset_type_name,
"Visible": dataset_type_summary.n_visible,
"Shadowed": dataset_type_summary.n_shadowed,
"Predicted Only": dataset_type_summary.n_predicted_only,
"Unsuccessful": dataset_type_summary.n_unsuccessful,
"Cursed": dataset_type_summary.n_cursed,
"TOTAL": sum(
[
dataset_type_summary.n_visible,
dataset_type_summary.n_shadowed,
dataset_type_summary.n_predicted_only,
dataset_type_summary.n_unsuccessful,
dataset_type_summary.n_cursed,
]
),
"EXPECTED": dataset_type_summary.n_expected,
}
)
if dataset_type_summary.n_cursed > 0:
for cursed_dataset in dataset_type_summary.cursed_datasets:
print(
f"{dataset_type_name} has cursed quanta with message(s) {cursed_dataset.messages}. "
"Recommend processing cease until the issue is resolved."
)
cursed_datasets.append(
{
"Dataset Type": dataset_type_name,
"Producer Data Id": cursed_dataset.producer_data_id,
}
)
if dataset_type_summary.n_unsuccessful > 0:
unsuccessful_datasets[dataset_type_name] = dataset_type_summary.unsuccessful_datasets
datasets = Table(dataset_table)
datasets.pprint_all()
curse_table = Table(cursed_datasets)
# Display wonky quanta
if wonky_quanta_table:
print("Wonky Quanta")
pprint.pprint(wonky_quanta_table)
# Display cursed datasets
if cursed_datasets:
print("Cursed Datasets")
curse_table.pprint_all()
summary.pprint(brief=(brief or bool(full_output_filename)))
if full_output_filename:
with open(full_output_filename, "w") as stream:
stream.write(summary.model_dump_json(indent=2))
else:
if not brief:
if failed_quanta_table:
print("Failed Quanta")
pprint.pprint(failed_quanta_table)
if unsuccessful_datasets:
print("Unsuccessful Datasets")
pprint.pprint(unsuccessful_datasets)
3 changes: 0 additions & 3 deletions tests/test_cliCmdReport.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,6 @@ def test_report(self):
self.assertIn("TOTAL", result_v2_terminal_out.stdout)
self.assertIn("EXPECTED", result_v2_terminal_out.stdout)

# Check that title from the error summary appears
self.assertIn("Unsuccessful Datasets", result_v2_terminal_out.stdout)

# Test cli for the QPG brief option
result_v2_brief = self.runner.invoke(
pipetask_cli,
Expand Down
152 changes: 148 additions & 4 deletions tests/test_simple_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,75 @@ def test_partial_outputs_success(self):
prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph])
(quantum_key_a,) = prov.quanta["a"]
quantum_info_a = prov.get_quantum_info(quantum_key_a)
_, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a)
self.assertEqual(
quantum_info_a["caveats"],
quantum_run_a.caveats,
QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
| QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
| QuantumSuccessCaveats.PARTIAL_OUTPUTS_ERROR,
)
self.assertEqual(
quantum_run_a.exception.type_name,
"lsst.pipe.base.tests.mocks.MockAlgorithmError",
)
self.assertEqual(
quantum_run_a.exception.metadata,
{"badness": 12},
)
(quantum_key_b,) = prov.quanta["b"]
quantum_info_b = prov.get_quantum_info(quantum_key_b)
self.assertEqual(quantum_info_b["caveats"], QuantumSuccessCaveats.NO_CAVEATS)
_, quantum_run_b = qpg.QuantumRun.find_final(quantum_info_b)
self.assertEqual(quantum_run_b.caveats, QuantumSuccessCaveats.NO_CAVEATS)
prov_summary = prov.to_summary(self.butler)
# One partial-outputs case, with an empty data ID:
self.assertEqual(prov_summary.tasks["a"].caveats, {"*P": [{}]})
self.assertEqual(
prov_summary.tasks["a"].exceptions.keys(), {"lsst.pipe.base.tests.mocks.MockAlgorithmError"}
)
self.assertEqual(
prov_summary.tasks["a"]
.exceptions["lsst.pipe.base.tests.mocks.MockAlgorithmError"][0]
.exception.metadata,
{"badness": 12},
)
# No caveats for the second task, since it didn't need the first task's
# output anyway.
self.assertEqual(prov_summary.tasks["b"].caveats, {})
self.assertEqual(prov_summary.tasks["b"].exceptions, {})
# Check table forms for summaries of the same information.
quantum_table = prov_summary.make_quantum_table()
self.assertEqual(list(quantum_table["Task"]), ["a", "b"])
self.assertEqual(list(quantum_table["Unknown"]), [0, 0])
self.assertEqual(list(quantum_table["Successful"]), [1, 1])
self.assertEqual(list(quantum_table["Caveats"]), ["*P(1)", ""])
self.assertEqual(list(quantum_table["Blocked"]), [0, 0])
self.assertEqual(list(quantum_table["Failed"]), [0, 0])
self.assertEqual(list(quantum_table["Wonky"]), [0, 0])
self.assertEqual(list(quantum_table["TOTAL"]), [1, 1])
self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1])
dataset_table = prov_summary.make_dataset_table()
self.assertEqual(
list(dataset_table["Dataset"]),
["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"],
)
self.assertEqual(list(dataset_table["Visible"]), [0, 1, 1, 1, 1, 1])
self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["Predicted Only"]), [1, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["Unsuccessful"]), [0, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1])
self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1])
exception_table = prov_summary.make_exception_table()
self.assertEqual(list(exception_table["Task"]), ["a"])
self.assertEqual(
list(exception_table["Exception"]), ["lsst.pipe.base.tests.mocks.MockAlgorithmError"]
)
self.assertEqual(list(exception_table["Count"]), [1])
bad_quantum_tables = prov_summary.make_bad_quantum_tables()
self.assertEqual(bad_quantum_tables.keys(), {"a"})
self.assertEqual(list(bad_quantum_tables["a"]["Status(Caveats)"]), ["SUCCESSFUL(P)"])
self.assertEqual(list(bad_quantum_tables["a"]["Exception"]), ["MockAlgorithmError"])
self.assertFalse(prov_summary.make_bad_dataset_tables())

def test_no_work_found(self):
"""Test executing two quanta where the first raises
Expand Down Expand Up @@ -304,21 +364,56 @@ def test_no_work_found(self):
prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph])
(quantum_key_a,) = prov.quanta["a"]
quantum_info_a = prov.get_quantum_info(quantum_key_a)
_, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a)
self.assertEqual(
quantum_info_a["caveats"],
quantum_run_a.caveats,
QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
| QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
| QuantumSuccessCaveats.NO_WORK,
)
(quantum_key_b,) = prov.quanta["b"]
quantum_info_b = prov.get_quantum_info(quantum_key_b)
_, quantum_run_b = qpg.QuantumRun.find_final(quantum_info_b)
self.assertEqual(
quantum_info_b["caveats"],
quantum_run_b.caveats,
QuantumSuccessCaveats.ALL_OUTPUTS_MISSING
| QuantumSuccessCaveats.ANY_OUTPUTS_MISSING
| QuantumSuccessCaveats.ADJUST_QUANTUM_RAISED
| QuantumSuccessCaveats.NO_WORK,
)
prov_summary = prov.to_summary(self.butler)
# One NoWorkFound, raised by runQuantum, with an empty data ID:
self.assertEqual(prov_summary.tasks["a"].caveats, {"*N": [{}]})
self.assertEqual(prov_summary.tasks["a"].exceptions, {})
# One NoWorkFound, raised by adjustQuantum, with an empty data ID.
self.assertEqual(prov_summary.tasks["b"].caveats, {"*A": [{}]})
self.assertEqual(prov_summary.tasks["b"].exceptions, {})
# Check table forms for summaries of the same information.
quantum_table = prov_summary.make_quantum_table()
self.assertEqual(list(quantum_table["Task"]), ["a", "b"])
self.assertEqual(list(quantum_table["Unknown"]), [0, 0])
self.assertEqual(list(quantum_table["Successful"]), [1, 1])
self.assertEqual(list(quantum_table["Caveats"]), ["*N(1)", "*A(1)"])
self.assertEqual(list(quantum_table["Blocked"]), [0, 0])
self.assertEqual(list(quantum_table["Failed"]), [0, 0])
self.assertEqual(list(quantum_table["Wonky"]), [0, 0])
self.assertEqual(list(quantum_table["TOTAL"]), [1, 1])
self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1])
dataset_table = prov_summary.make_dataset_table()
self.assertEqual(
list(dataset_table["Dataset"]),
["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"],
)
self.assertEqual(list(dataset_table["Visible"]), [0, 1, 1, 0, 1, 1])
self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["Predicted Only"]), [1, 0, 0, 1, 0, 0])
self.assertEqual(list(dataset_table["Unsuccessful"]), [0, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1])
self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1])
self.assertFalse(prov_summary.make_exception_table())
self.assertFalse(prov_summary.make_bad_quantum_tables())
self.assertFalse(prov_summary.make_bad_dataset_tables())

def test_partial_outputs_failure(self):
"""Test executing two quanta where the first raises
Expand Down Expand Up @@ -357,6 +452,55 @@ def test_partial_outputs_failure(self):
executor.run(register_dataset_types=True)
self.assertFalse(self.butler.exists("intermediate"))
self.assertFalse(self.butler.exists("output"))
prov = qpg.QuantumProvenanceGraph()
prov.assemble_quantum_provenance_graph(self.butler, [executor.quantum_graph])
(quantum_key_a,) = prov.quanta["a"]
quantum_info_a = prov.get_quantum_info(quantum_key_a)
_, quantum_run_a = qpg.QuantumRun.find_final(quantum_info_a)
self.assertEqual(quantum_run_a.status, qpg.QuantumRunStatus.FAILED)
self.assertIsNone(quantum_run_a.caveats)
self.assertIsNone(quantum_run_a.exception)
(quantum_key_b,) = prov.quanta["b"]
quantum_info_b = prov.get_quantum_info(quantum_key_b)
self.assertEqual(quantum_info_b["status"], qpg.QuantumInfoStatus.BLOCKED)
prov_summary = prov.to_summary(self.butler)
# One partial-outputs failure case for the first task.
self.assertEqual(prov_summary.tasks["a"].n_failed, 1)
# No direct failures, but one blocked for the second
self.assertEqual(prov_summary.tasks["b"].n_failed, 0)
self.assertEqual(prov_summary.tasks["b"].n_blocked, 1)
# Check table forms for summaries of the same information.
quantum_table = prov_summary.make_quantum_table()
self.assertEqual(list(quantum_table["Task"]), ["a", "b"])
self.assertEqual(list(quantum_table["Unknown"]), [0, 0])
self.assertEqual(list(quantum_table["Successful"]), [0, 0])
self.assertEqual(list(quantum_table["Caveats"]), ["", ""])
self.assertEqual(list(quantum_table["Blocked"]), [0, 1])
self.assertEqual(list(quantum_table["Failed"]), [1, 0])
self.assertEqual(list(quantum_table["Wonky"]), [0, 0])
self.assertEqual(list(quantum_table["TOTAL"]), [1, 1])
self.assertEqual(list(quantum_table["EXPECTED"]), [1, 1])
dataset_table = prov_summary.make_dataset_table()
self.assertEqual(
list(dataset_table["Dataset"]),
["intermediate", "a_metadata", "a_log", "output", "b_metadata", "b_log"],
)
# Note that a_log is UNSUCCESSFUL, not VISIBLE, despite being present
# in butler because those categories are mutually exclusive and we
# don't want to consider any outputs of failed quanta to be successful.
self.assertEqual(list(dataset_table["Visible"]), [0, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["Shadowed"]), [0, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["Predicted Only"]), [0, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["Unsuccessful"]), [1, 1, 1, 1, 1, 1])
self.assertEqual(list(dataset_table["Cursed"]), [0, 0, 0, 0, 0, 0])
self.assertEqual(list(dataset_table["TOTAL"]), [1, 1, 1, 1, 1, 1])
self.assertEqual(list(dataset_table["EXPECTED"]), [1, 1, 1, 1, 1, 1])
self.assertFalse(prov_summary.make_exception_table())
bad_quantum_tables = prov_summary.make_bad_quantum_tables()
self.assertEqual(bad_quantum_tables.keys(), {"a"})
self.assertEqual(list(bad_quantum_tables["a"]["Status(Caveats)"]), ["FAILED"])
self.assertEqual(list(bad_quantum_tables["a"]["Exception"]), [""])
self.assertFalse(prov_summary.make_bad_dataset_tables())

def test_existence_check_skips(self):
"""Test that pre-execution existence checks are not performed for
Expand Down

0 comments on commit 1164789

Please sign in to comment.