diff --git a/tests/utils/scribereader_test.py b/tests/utils/scribereader_test.py index d2a01eb29..f6f1c16ea 100644 --- a/tests/utils/scribereader_test.py +++ b/tests/utils/scribereader_test.py @@ -2,8 +2,10 @@ from unittest import mock import pytest +import yaml import tron.utils.scribereader +from tron.utils.scribereader import decompose_action_id from tron.utils.scribereader import read_log_stream_for_action_run try: @@ -420,3 +422,78 @@ def test_read_log_stream_for_action_run_min_date_and_max_date_for_long_output(): # The expected output should be max_lines plus the # extra line for 'This output is truncated.' message assert len(output) == max_lines + 1 + + +def test_decompose_action_id_yml_file_found(): + action_run_id = "namespace.job.1234.action" + paasta_cluster = "fake_cluster" + config_content = """ + job: + actions: + action: + service: test_service + """ + with mock.patch("builtins.open", mock.mock_open(read_data=config_content)), mock.patch( + "yaml.safe_load", return_value=yaml.safe_load(config_content) + ): + namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster) + assert namespace == "test_service" + assert job_name == "job" + assert run_num == "1234" + assert action == "action" + + +def test_decompose_action_id_file_not_found(): + action_run_id = "namespace.job.1234.action" + paasta_cluster = "fake_cluster" + with mock.patch("builtins.open", side_effect=FileNotFoundError): + namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster) + assert namespace == "namespace" + assert job_name == "job" + assert run_num == "1234" + assert action == "action" + + +def test_decompose_action_id_yaml_error(): + action_run_id = "namespace.job.1234.action" + paasta_cluster = "fake_cluster" + with mock.patch("builtins.open", mock.mock_open(read_data="invalid_yaml")), mock.patch( + "yaml.safe_load", side_effect=yaml.YAMLError + ): + namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster) + assert namespace == "namespace" + assert job_name == "job" + assert run_num == "1234" + assert action == "action" + + +def test_decompose_action_id_generic_error(): + action_run_id = "namespace.job.1234.action" + paasta_cluster = "fake_cluster" + with mock.patch("builtins.open", mock.mock_open(read_data="some_data")), mock.patch( + "yaml.safe_load", side_effect=Exception + ): + namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster) + assert namespace == "namespace" + assert job_name == "job" + assert run_num == "1234" + assert action == "action" + + +def test_decompose_action_id_service_not_found(): + action_run_id = "namespace.job.1234.action" + paasta_cluster = "fake_cluster" + config_content = """ + job: + actions: + action: + command: "sleep 10" + """ + with mock.patch("builtins.open", mock.mock_open(read_data=config_content)), mock.patch( + "yaml.safe_load", return_value=yaml.safe_load(config_content) + ): + namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster) + assert namespace == "namespace" + assert job_name == "job" + assert run_num == "1234" + assert action == "action" diff --git a/tron/utils/scribereader.py b/tron/utils/scribereader.py index 381d8a534..d79cc2f94 100644 --- a/tron/utils/scribereader.py +++ b/tron/utils/scribereader.py @@ -10,6 +10,7 @@ from typing import Tuple import staticconf # type: ignore +import yaml from tron.config.static_config import get_config_watcher from tron.config.static_config import NAMESPACE @@ -81,12 +82,37 @@ def get_scribereader_host_and_port(ecosystem: str, superregion: str, region: str return host, port +def decompose_action_id(action_run_id: str, paasta_cluster: str) -> Tuple[str, str, str, str]: + namespace, job_name, run_num, action = action_run_id.split(".") + for ext in ["yaml", "yml"]: + try: + with open(f"/nail/etc/services/{namespace}/tron-{paasta_cluster}.{ext}") as f: + config = yaml.load(f, Loader=yaml.CSafeLoader) + service: Optional[str] = ( + config.get(job_name, {}).get("actions", {}).get(action, {}).get("service", None) + ) + if service: + return service, job_name, run_num, action + except FileNotFoundError: + log.warning(f"yelp-soaconfig file tron-{paasta_cluster}.{ext} not found for action_run_id {action_run_id}.") + except yaml.YAMLError: + log.exception( + f"Error parsing YAML file tron-{paasta_cluster}.yaml for {action_run_id} - will default to using current namespace:" + ) + except Exception: + log.exception( + f"Error reading service for {action_run_id} from file tron-{paasta_cluster}.yaml - will default to using current namespace:" + ) + + return namespace, job_name, run_num, action + + class PaaSTALogs: def __init__(self, component: str, paasta_cluster: str, action_run_id: str) -> None: self.component = component self.paasta_cluster = paasta_cluster self.action_run_id = action_run_id - namespace, job_name, run_num, action = action_run_id.split(".") + namespace, job_name, run_num, action = decompose_action_id(action_run_id, paasta_cluster) # in our logging infra, things are logged to per-instance streams - but # since Tron PaaSTA instances are of the form `job_name.action`, we need # to escape the period since some parts of our infra will reject streams