diff --git a/bin/trond b/bin/trond index ae13634cf..2a84deae3 100755 --- a/bin/trond +++ b/bin/trond @@ -3,7 +3,6 @@ import argparse import logging import os -import queue import traceback import pkg_resources @@ -22,14 +21,6 @@ DEFAULT_LOCKFILE = "tron.lock" DEFAULT_LOCKPATH = "/var/run/" + DEFAULT_LOCKFILE -def patch_queue() -> None: - # this is absolutely disgusting, but something about the locking behavior in the C version of this - # appears to be wreaking havoc on either our taskproc queues or potentially Twisted-internal queues. - # we've gone the gevent route (see gevent/gevent#1253) and monkey-patched back to the Python implementation - # (i.e., the implementation used in py36). - queue.SimpleQueue = queue._PySimpleQueue - - def parse_cli(): parser = argparse.ArgumentParser() @@ -178,7 +169,6 @@ def main(): args = parse_cli() setup_environment(args) - patch_queue() trond = trondaemon.TronDaemon(args) trond.run() diff --git a/requirements.txt b/requirements.txt index 2d56568b1..503d519d8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,6 +15,7 @@ certifi==2022.12.7 cffi==1.12.3 cfn-lint==0.24.4 chardet==3.0.4 +clusterman-metrics==2.2.1 # used by tron for pre-scaling for Spark runs constantly==15.1.0 cryptography==39.0.1 dataclasses==0.6 @@ -75,6 +76,7 @@ requests-oauthlib==1.2.0 responses==0.10.6 rsa==4.9 s3transfer==0.6.0 +scribereader==0.14.1 # used by tron to get tronjob logs setuptools==65.5.1 six==1.15.0 sshpubkeys==3.1.0 @@ -89,4 +91,7 @@ websocket-client==0.56.0 Werkzeug==2.2.3 wrapt==1.11.2 xmltodict==0.12.0 +yelp-clog==7.0.1 # scribereader dependency +yelp-logging==4.17.0 # scribereader dependency +yelp-meteorite==2.1.1 # used by task-processing to emit metrics, clusterman-metrics dependency zope.interface==5.1.0 diff --git a/tron/kubernetes.py b/tron/kubernetes.py index 371ba98fd..7056b31e1 100644 --- a/tron/kubernetes.py +++ b/tron/kubernetes.py @@ -1,3 +1,4 @@ +import json import logging from logging import Logger from typing import cast @@ -40,6 +41,17 @@ KUBERNETES_LOST_NODE_EXIT_CODES = {exitcode.EXIT_KUBERNETES_SPOT_INTERRUPTION, exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN} log = logging.getLogger(__name__) +try: + import clog # type: ignore + + clog.config.configure( + scribe_host="169.254.255.254", + scribe_port=1463, + monk_disable=False, + scribe_disable=False, + ) +except ImportError: + clog = None def combine_volumes( @@ -125,138 +137,158 @@ def handle_event(self, event: Event) -> None: """ Transitions Tron's state machine for this task based on events from task_processing. """ - event_id = getattr(event, "task_id", None) - if event_id != self.get_kubernetes_id(): - self.log.warning( - f"Event task id (id={event_id}) does not match current task id (id={self.get_kubernetes_id()}), ignoring.", - ) - return - - k8s_type = getattr(event, "platform_type", None) - self.log.info(f"Got event for task={event_id} (Kubernetes type={k8s_type}).") - try: - self.log_event_info(event=event) - except Exception: - self.log.exception(f"Unable to log event info for id={event_id}.") - - if k8s_type == "running": - self.started() - elif k8s_type in KUBERNETES_TERMINAL_TYPES: - raw_object = getattr(event, "raw", {}) or {} - pod_status = raw_object.get("status", {}) or {} - container_statuses = pod_status.get("containerStatuses", []) or [] - exit_code = 0 if k8s_type == "finished" else exitcode.EXIT_KUBERNETES_ABNORMAL - - if len(container_statuses) > 1 or len(container_statuses) == 0: - # shouldn't happen right now, but who knows what future us will do :p - self.log.error( - "Got an event for a Pod with zero or multiple containers - not inspecting payload to verify success." + # Might be that somewhere in this function, something is throwing an exception and we're not handling it + # wrap this whole function in a big block of try/except block + # if handle_event errors out for any reason then we will never see this event at all + event_id = getattr(event, "task_id", None) + if event_id != self.get_kubernetes_id(): + self.log.warning( + f"Event task id (id={event_id}) does not match current task id (id={self.get_kubernetes_id()}), ignoring.", ) - self.log.error(f"Event with >1 || 0 containers: {raw_object}") - else: - main_container_statuses = container_statuses[0] - main_container_state = main_container_statuses.get("state", {}) or {} - main_container_last_state = main_container_statuses.get("lastState", {}) or {} + return - event_missing_state = not main_container_state - event_missing_previous_state = not main_container_last_state + k8s_type = getattr(event, "platform_type", None) + self.log.info(f"Got event for task={event_id} (Kubernetes type={k8s_type}).") - # We are expecting this code to never be hit as we are expecting both state and last_state have values - # The else statement should handle the situation gracefully when either current/last state are missing - if event_missing_state and event_missing_previous_state: + try: + self.log_event_info(event=event) + except Exception: + self.log.exception(f"Unable to log event info for id={event_id}.") + + if k8s_type == "running": + self.started() + elif k8s_type in KUBERNETES_TERMINAL_TYPES: + raw_object = getattr(event, "raw", {}) or {} + pod_status = raw_object.get("status", {}) or {} + container_statuses = pod_status.get("containerStatuses", []) or [] + exit_code = 0 if k8s_type == "finished" else exitcode.EXIT_KUBERNETES_ABNORMAL + + if len(container_statuses) > 1 or len(container_statuses) == 0: + # shouldn't happen right now, but who knows what future us will do :p self.log.error( - f"Got an event with missing state - assuming {'success' if exit_code==0 else 'failure'}." + "Got an event for a Pod with zero or multiple containers - not inspecting payload to verify success." ) - self.log.error(f"Event with missing state: {raw_object}") + self.log.error(f"Event with >1 || 0 containers: {raw_object}") else: - state_termination_metadata = main_container_state.get("terminated", {}) or {} - last_state_termination_metadata = main_container_last_state.get("terminated", {}) or {} - if k8s_type == "finished": - # this is kinda wild: we're seeing that a kubelet will sometimes fail to start a container (usually - # due to what appear to be race conditons like those mentioned in - # https://github.com/kubernetes/kubernetes/issues/100047#issuecomment-797624208) and then decide that - # these Pods should be phase=Succeeded with an exit code of 0 - even though the container never actually - # started. So far, we've noticed that when this happens, the finished_at and reason fields will be None - # and thus we'll check for at least one of these conditions to detect an abnormal exit and actually "fail" - # the affected action - # NOTE: hopefully this won't change too drastically in future k8s upgrades without the actual problem (incorrect - # success) being fixed :p - if state_termination_metadata.get("exitCode") == 0 and ( - state_termination_metadata.get("finishedAt") is None - and state_termination_metadata.get("reason") is None - ): - exit_code = exitcode.EXIT_KUBERNETES_ABNORMAL - self.log.warning("Container never started due to a Kubernetes/infra flake!") - self.log.warning( - f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry." - ) - elif k8s_type in KUBERNETES_FAILURE_TYPES: - # pod killed before it reached terminal state, assume node scaledown - if not (state_termination_metadata or last_state_termination_metadata): - self.log.warning("Container did not complete, likely due to scaling down a node.") - exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN - - # Handling spot terminations - elif ( - last_state_termination_metadata.get("exitCode") == 137 - and last_state_termination_metadata.get("reason") == "ContainerStatusUnknown" - ): - exit_code = exitcode.EXIT_KUBERNETES_SPOT_INTERRUPTION - self.log.warning("Tronjob failed due to spot interruption.") - # Handling K8s scaling down a node - elif state_termination_metadata.get("exitCode") == 143 and ( - state_termination_metadata.get("reason") == "Error" - ): - exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN - self.log.warning("Tronjob failed due to Kubernetes scaling down a node.") - else: - # Capture the real exit code - state_exit_code = state_termination_metadata.get("exitCode") - last_state_exit_code = last_state_termination_metadata.get("exitCode") - if state_exit_code: - exit_code = state_exit_code - elif last_state_exit_code: - exit_code = last_state_exit_code - - if exit_code in KUBERNETES_LOST_NODE_EXIT_CODES: - self.log.warning( - f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry." - ) - self.log.warning( - "If this action is idempotent, then please consider enabling automatic retries for your action. If your action is not idempotent, then please configure this action to run on the stable pool rather than the default." - ) - self.exited(exit_code) - elif k8s_type == "lost": - # Using 'lost' instead of 'unknown' for now until we are sure that before reconcile() is called, - # the tasks inside task_metadata map are all UNKNOWN - self.log.warning("Kubernetes does not know anything about this task, it is LOST") - self.log.warning( - "This can happen for any number of reasons, and Tron can't know if the task ran or not at all!" - ) - self.log.warning("If you want Tron to RUN it (again) anyway, retry it with:") - self.log.warning(f" tronctl retry {self.id}") - self.log.warning("If you want Tron to NOT run it and consider it as a success, skip it with:") - self.log.warning(f" tronctl skip {self.id}") - self.log.warning("If you want Tron to NOT run it and consider it as a failure, fail it with:") - self.log.warning(f" tronctl fail {self.id}") - self.exited(None) - else: - self.log.info( - f"Did not handle unknown kubernetes event type: {event}", - ) - - if event.terminal: - self.log.info("This Kubernetes event was terminal, ending this action") - self.report_resources(decrement=True) - - exit_code = int(not getattr(event, "success", False)) - # Returns False if we've already exited normally above - unexpected_error = self.exited(exit_code) - if unexpected_error: - self.log.error("Unexpected failure, exiting") + main_container_statuses = container_statuses[0] + main_container_state = main_container_statuses.get("state", {}) or {} + main_container_last_state = main_container_statuses.get("lastState", {}) or {} + + event_missing_state = not main_container_state + event_missing_previous_state = not main_container_last_state + + # We are expecting this code to never be hit as we are expecting both state and last_state have values + # The else statement should handle the situation gracefully when either current/last state are missing + if event_missing_state and event_missing_previous_state: + self.log.error( + f"Got an event with missing state - assuming {'success' if exit_code==0 else 'failure'}." + ) + self.log.error(f"Event with missing state: {raw_object}") + else: + state_termination_metadata = main_container_state.get("terminated", {}) or {} + last_state_termination_metadata = main_container_last_state.get("terminated", {}) or {} + if k8s_type == "finished": + # this is kinda wild: we're seeing that a kubelet will sometimes fail to start a container (usually + # due to what appear to be race conditons like those mentioned in + # https://github.com/kubernetes/kubernetes/issues/100047#issuecomment-797624208) and then decide that + # these Pods should be phase=Succeeded with an exit code of 0 - even though the container never actually + # started. So far, we've noticed that when this happens, the finished_at and reason fields will be None + # and thus we'll check for at least one of these conditions to detect an abnormal exit and actually "fail" + # the affected action + # NOTE: hopefully this won't change too drastically in future k8s upgrades without the actual problem (incorrect + # success) being fixed :p + if state_termination_metadata.get("exitCode") == 0 and ( + state_termination_metadata.get("finishedAt") is None + and state_termination_metadata.get("reason") is None + ): + exit_code = exitcode.EXIT_KUBERNETES_ABNORMAL + self.log.warning("Container never started due to a Kubernetes/infra flake!") + self.log.warning( + f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry." + ) + elif k8s_type in KUBERNETES_FAILURE_TYPES: + # pod killed before it reached terminal state, assume node scaledown + if not (state_termination_metadata or last_state_termination_metadata): + self.log.warning("Container did not complete, likely due to scaling down a node.") + exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN + + # Handling spot terminations + elif ( + last_state_termination_metadata.get("exitCode") == 137 + and last_state_termination_metadata.get("reason") == "ContainerStatusUnknown" + ): + exit_code = exitcode.EXIT_KUBERNETES_SPOT_INTERRUPTION + self.log.warning("Tronjob failed due to spot interruption.") + # Handling K8s scaling down a node + elif state_termination_metadata.get("exitCode") == 143 and ( + state_termination_metadata.get("reason") == "Error" + ): + exit_code = exitcode.EXIT_KUBERNETES_NODE_SCALEDOWN + self.log.warning("Tronjob failed due to Kubernetes scaling down a node.") + else: + # Capture the real exit code + state_exit_code = state_termination_metadata.get("exitCode") + last_state_exit_code = last_state_termination_metadata.get("exitCode") + if state_exit_code: + exit_code = state_exit_code + elif last_state_exit_code: + exit_code = last_state_exit_code + + if exit_code in KUBERNETES_LOST_NODE_EXIT_CODES: + self.log.warning( + f"If automatic retries are not enabled, run `tronctl retry {self.id}` to retry." + ) + self.log.warning( + "If this action is idempotent, then please consider enabling automatic retries for your action. If your action is not idempotent, then please configure this action to run on the stable pool rather than the default." + ) + self.exited(exit_code) + elif k8s_type == "lost": + # Using 'lost' instead of 'unknown' for now until we are sure that before reconcile() is called, + # the tasks inside task_metadata map are all UNKNOWN + self.log.warning("Kubernetes does not know anything about this task, it is LOST") + self.log.warning( + "This can happen for any number of reasons, and Tron can't know if the task ran or not at all!" + ) + self.log.warning("If you want Tron to RUN it (again) anyway, retry it with:") + self.log.warning(f" tronctl retry {self.id}") + self.log.warning("If you want Tron to NOT run it and consider it as a success, skip it with:") + self.log.warning(f" tronctl skip {self.id}") + self.log.warning("If you want Tron to NOT run it and consider it as a failure, fail it with:") + self.log.warning(f" tronctl fail {self.id}") + self.exited(None) + else: + self.log.info( + f"Did not handle unknown kubernetes event type: {event}", + ) - self.done() + if event.terminal: + self.log.info("This Kubernetes event was terminal, ending this action") + self.report_resources(decrement=True) + + exit_code = int(not getattr(event, "success", False)) + # Returns False if we've already exited normally above + unexpected_error = self.exited(exit_code) + if unexpected_error: + self.log.error("Unexpected failure, exiting") + + self.done() + except Exception as e: + self.log.exception(f"unable to handle an event for id={event_id} for event={str(event)}") + # clog here and make sure the message is a string + if clog is None: + log.debug("Clog logger unavailable. Unable to log event") + else: + clog.log_line( + "tmp_missed_tronevents", + json.dumps( + { + "event": str(event), + "exception": type(e), + "exception_message": str(e), + } + ), + ) class KubernetesCluster: diff --git a/tron/trondaemon.py b/tron/trondaemon.py index be1088b3d..7c3591f8a 100644 --- a/tron/trondaemon.py +++ b/tron/trondaemon.py @@ -7,7 +7,6 @@ import signal import threading import time -from functools import lru_cache import ipdb import pkg_resources @@ -55,11 +54,6 @@ def setup_logging(options): # Show stack traces for errors in twisted deferreds. if options.debug: defer.setDebugging(True) - # Cache getLogger calls as we are seeing kubernetes-client locking on logging causing event processing to be extremely delayed - # This is a workaround and ideally we would want to remove this once this is fixed upstream in kubernetes-client - # For more details: https://github.com/kubernetes-client/python/issues/1867 - # For Yelpers, misc/jolt#148 has a similar workaround for this issue internally. - logging.getLogger = lru_cache(maxsize=None)(logging.getLogger) @contextlib.contextmanager diff --git a/yelp_package/extra_requirements_yelp.txt b/yelp_package/extra_requirements_yelp.txt index 4d3333148..c09a8a6d8 100644 --- a/yelp_package/extra_requirements_yelp.txt +++ b/yelp_package/extra_requirements_yelp.txt @@ -1,5 +1,5 @@ clusterman-metrics==2.2.1 # used by tron for pre-scaling for Spark runs scribereader==0.14.1 # used by tron to get tronjob logs -yelp-clog==5.2.3 # scribereader dependency +yelp-clog==7.0.1 # scribereader dependency yelp-logging==4.17.0 # scribereader dependency yelp-meteorite==2.1.1 # used by task-processing to emit metrics, clusterman-metrics dependency