From ec733d3aa84f10f1b1677692c5420721aec87dc5 Mon Sep 17 00:00:00 2001 From: Andrew Madonna Date: Tue, 8 Aug 2017 13:33:45 -0400 Subject: [PATCH] Add Slack monitoring (#13) --- setup.py | 5 +- taskflow/core/models.py | 2 + taskflow/monitoring/aws.py | 4 +- taskflow/monitoring/base.py | 34 +++++++ taskflow/monitoring/slack.py | 142 +++++++++++++++++++++++++++++ taskflow/push_workers/aws_batch.py | 13 +++ tests/test_monitoring.py | 134 +++++++++++++++++++++++++++ 7 files changed, 330 insertions(+), 4 deletions(-) create mode 100644 taskflow/monitoring/slack.py create mode 100644 tests/test_monitoring.py diff --git a/setup.py b/setup.py index 7523e5b..125dba1 100644 --- a/setup.py +++ b/setup.py @@ -5,12 +5,13 @@ setup( name='taskflow', - version='0.0.2', + version='0.1.0', packages=find_packages(), install_requires=[ 'alembic==0.9.2', 'boto3==1.4.4', 'click==6.7', + 'cron-descriptor==1.2.10', 'croniter==0.3.17', 'Flask==0.12.2', 'Flask-Cors==3.0.2', @@ -27,6 +28,6 @@ 'toposort==1.5' ], dependency_links=[ - 'https://github.com/CityOfPhiladelphia/restful-ben/tarball/master#egg=restful_ben-0.1.0' + 'https://github.com/CityOfPhiladelphia/restful-ben/tarball/0.0.1#egg=restful_ben-0.1.0' ], ) diff --git a/taskflow/core/models.py b/taskflow/core/models.py index bff8327..d81b495 100644 --- a/taskflow/core/models.py +++ b/taskflow/core/models.py @@ -269,6 +269,8 @@ def add_push_worker(self, push_worker): self._push_workers[push_worker.push_type] = push_worker def get_push_worker(self, push_type): + if push_type not in self._push_workers: + return None return self._push_workers[push_type] def sync_tasks(self, session, tasks): diff --git a/taskflow/monitoring/aws.py b/taskflow/monitoring/aws.py index ff5549d..dbd52be 100644 --- a/taskflow/monitoring/aws.py +++ b/taskflow/monitoring/aws.py @@ -1,8 +1,8 @@ import boto3 -from .base import Monitor +from .base import MonitorDestination -class AWSMonitor(Monitor): +class AWSMonitor(MonitorDestination): def __init__(self, metric_prefix='', metric_namespace='taskflow', diff --git a/taskflow/monitoring/base.py b/taskflow/monitoring/base.py index 6040493..cc131f0 100644 --- a/taskflow/monitoring/base.py +++ b/taskflow/monitoring/base.py @@ -1,5 +1,39 @@ +import logging class Monitor(object): + def __init__(self, destinations=None): + self.logger = logging.getLogger('Monitoring') + + self.destinations = destinations or [] + + def call_destinations(self, fn_name, *args): + for destination in self.destinations: + try: + getattr(destination, fn_name)(*args) + except: + self.logger.exception('Exception trying to call `{}` on the `{}` monitor.'\ + .format(fn_name, destination.__class__.__name__)) + + def heartbeat_scheduler(self): + self.call_destinations('heartbeat_scheduler') + + def task_retry(self, task_instance): + self.call_destinations('task_retry', task_instance) + + def task_failed(self, task_instance): + self.call_destinations('task_failed', task_instance) + + def task_success(self, task_instance): + self.call_destinations('task_success', task_instance) + + def workflow_failed(self, workflow_instance): + self.call_destinations('workflow_failed', workflow_instance) + + def workflow_success(self, workflow_instance): + self.call_destinations('workflow_success', workflow_instance) + + +class MonitorDestination(object): def heartbeat_scheduler(self): pass diff --git a/taskflow/monitoring/slack.py b/taskflow/monitoring/slack.py new file mode 100644 index 0000000..fb82197 --- /dev/null +++ b/taskflow/monitoring/slack.py @@ -0,0 +1,142 @@ +import re +import json + +from cron_descriptor import get_description +import requests + +from taskflow import WorkflowInstance, TaskInstance +from .base import MonitorDestination + +class SlackMonitor(MonitorDestination): + def __init__(self, session, taskflow, slack_url=None): + self.slack_url = slack_url or os.getenv('SLACK_WEBHOOK_URL') + self.session = session + self.taskflow = taskflow + + def get_log_url(self, failed_task): + push_destination = self.taskflow.get_task(failed_task.task_name).push_destination + push_worker = self.taskflow.get_push_worker(push_destination) + if push_worker and hasattr(push_worker, 'get_log_url'): + return push_worker.get_log_url(failed_task) + return None + + def get_message(self, item): + if isinstance(item, WorkflowInstance): + item_type = 'Workflow' + name = item.workflow_name + + failed_tasks = self.session.query(TaskInstance)\ + .filter(TaskInstance.workflow_instance_id == item.id, + TaskInstance.status == 'failed')\ + .all() + + failed_task_attachments = [] + for failed_task in failed_tasks: + failed_task_attachments.append({ + 'title': 'Failed Workflow Task', + 'color': '#ff0000', + 'fields': [ + { + 'title': 'Task', + 'value': failed_task.task_name + }, + { + 'title': 'ID', + 'value': failed_task.id + }, + { + 'title': 'Number of Attempts', + 'value': failed_task.attempts + }, + { + 'title': 'Logs', + 'value': self.get_log_url(failed_task) + } + ] + }) + else: + item_type = 'Task' + name = item.task_name + + attachments = [{ + 'title': '{} Failure'.format(item_type), + 'text': ' A {} in Taskflow failed'.format(item_type.lower()), + 'color': '#ff0000', + 'fields': [ + { + 'title': item_type, + 'value': name, + 'short': False + }, + { + 'title': 'ID', + 'value': item.id + }, + { + 'title': 'Priority', + 'value': item.priority + }, + { + 'title': 'Scheduled Run Time', + 'value': '{:%Y-%m-%d %H:%M:%S}'.format(item.run_at) + }, + { + 'title': 'Start Time', + 'value': '{:%Y-%m-%d %H:%M:%S}'.format(item.started_at) + }, + { + 'title': 'Failure Time', + 'value': '{:%Y-%m-%d %H:%M:%S}'.format(item.ended_at) + } + ] + }] + + if item.scheduled: + workflow = self.taskflow.get_workflow(item.workflow_name) + attachments[0]['fields'].append({ + 'title': 'Schedule', + 'value': '{} ({})'.format(get_description(workflow.schedule), workflow.schedule) + }) + + if failed_task_attachments: + attachments += failed_task_attachments + else: + attachments[0]['fields'].append({ + 'title': 'Number of Attempts', + 'value': item.attempts + }) + attachments[0]['fields'].append({ + 'title': 'Logs', + 'value': self.get_log_url(item) + }) + + return {'attachments': attachments} + + def send_to_slack(self, message): + requests.post( + self.slack_url, + data=json.dumps(message), + headers={'Content-Type': 'application/json'}) + + def heartbeat_scheduler(self): + pass + + def task_retry(self, task_instance): + pass + + def task_failed(self, task_instance): + ## only alert on tasks not associated with a workflow. + ## Task failures will bubble up to the workflow + if task_instance.workflow_instance_id == None: + message = self.get_message(task_instance) + self.send_to_slack(message) + + def task_success(self, task_instance): + pass + + def workflow_failed(self, workflow_instance): + message = self.get_message(workflow_instance) + self.send_to_slack(message) + + def workflow_success(self, workflow_instance): + pass diff --git a/taskflow/push_workers/aws_batch.py b/taskflow/push_workers/aws_batch.py index f9710f8..cb6ab6a 100644 --- a/taskflow/push_workers/aws_batch.py +++ b/taskflow/push_workers/aws_batch.py @@ -17,6 +17,18 @@ def __init__(self, *args, default_job_queue=None, default_job_definition=None, * self.default_job_queue = default_job_queue self.default_job_definition = default_job_definition + def get_log_url(self, task_instance): + try: + push_state = task_instance.push_state + if push_state and 'taskArn' in push_state: + task_id = re.match(r'task/(.+)', push_state['taskArn']).groups()[0] + + return '{}/{}/{}'.format(push_state['jobName'][:50], push_state['jobId'], task_id) + except: + self.logger.exception('Exception getting AWS Batch log URL') + + return None + def sync_task_instance_states(self, session, dry_run, task_instances): jobs = dict() for task_instance in task_instances: @@ -41,6 +53,7 @@ def sync_task_instance_states(self, session, dry_run, task_instances): task_instance = jobs[job['jobId']] if task_instance.status != status: task_instance.status = status + task_instance.push_state = job if not dry_run: session.commit() diff --git a/tests/test_monitoring.py b/tests/test_monitoring.py new file mode 100644 index 0000000..c43224e --- /dev/null +++ b/tests/test_monitoring.py @@ -0,0 +1,134 @@ +from datetime import datetime + +import requests_mock +from restful_ben.test_utils import dict_contains + +from taskflow import WorkflowInstance, TaskInstance +from taskflow.monitoring.slack import SlackMonitor + +from shared_fixtures import * + +def test_slack_monitor(dbsession, workflows): + taskflow = Taskflow() + taskflow.add_workflows(workflows) + + workflow_instance = WorkflowInstance( + workflow_name='workflow1', + scheduled=True, + run_at=datetime(2017, 6, 3, 6), + started_at=datetime(2017, 6, 3, 6), + ended_at=datetime(2017, 6, 3, 6, 0, 36), + status='running', + priority='normal') + dbsession.add(workflow_instance) + dbsession.commit() + task_instance1 = TaskInstance( + task_name='task1', + scheduled=True, + workflow_instance_id=workflow_instance.id, + status='success', + run_at=datetime(2017, 6, 3, 6, 0, 34), + attempts=1, + priority='normal', + push=False, + timeout=300, + retry_delay=300) + task_instance2 = TaskInstance( + task_name='task2', + scheduled=True, + workflow_instance_id=workflow_instance.id, + status='success', + run_at=datetime(2017, 6, 3, 6, 0, 34), + attempts=1, + priority='normal', + push=False, + timeout=300, + retry_delay=300) + task_instance3 = TaskInstance( + task_name='task3', + scheduled=True, + workflow_instance_id=workflow_instance.id, + status='failed', + run_at=datetime(2017, 6, 3, 6, 0, 34), + attempts=1, + priority='normal', + push=False, + timeout=300, + retry_delay=300) + dbsession.add(task_instance1) + dbsession.add(task_instance2) + dbsession.add(task_instance3) + dbsession.commit() + + slack_url = 'http://fakeurl.com/foo' + + slack_monitor = SlackMonitor(dbsession, taskflow, slack_url=slack_url) + + with requests_mock.Mocker() as m: + m.post(slack_url) + + slack_monitor.workflow_failed(workflow_instance) + + assert m.called + assert m.call_count == 1 + + request = m.request_history[0] + data = request.json() + assert dict_contains(data['attachments'][0], { + 'text': ' A workflow in Taskflow failed', + 'title': 'Workflow Failure', + 'color': '#ff0000', + 'fields': [ + { + 'title': 'Workflow', + 'short': False, + 'value': 'workflow1' + }, + { + 'title': 'ID', + 'value': 1 + }, + { + 'title': 'Priority', + 'value': 'normal' + }, + { + 'title': 'Scheduled Run Time', + 'value': '2017-06-03 06:00:00' + }, + { + 'title': 'Start Time', + 'value': '2017-06-03 06:00:00' + }, + { + 'title': 'Failure Time', + 'value': '2017-06-03 06:00:36' + }, + { + 'title': 'Schedule', + 'value': 'At 06:00 AM (0 6 * * *)' + } + ] + }) + assert dict_contains(data['attachments'][1], { + 'title': 'Failed Workflow Task', + 'color': '#ff0000', + 'fields': [ + { + 'title': 'Task', + 'value': 'task3' + }, + { + 'title': 'ID', + 'value': 3 + }, + { + 'title': 'Number of Attempts', + 'value': 1 + }, + { + 'title': 'Logs', + 'value': None + } + ] + }) \ No newline at end of file