Skip to content
This repository has been archived by the owner on Nov 2, 2023. It is now read-only.

Commit

Permalink
Add Slack monitoring (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
awm33 authored Aug 8, 2017
1 parent 32bc173 commit ec733d3
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 4 deletions.
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

setup(
name='taskflow',
version='0.0.2',
version='0.1.0',
packages=find_packages(),
install_requires=[
'alembic==0.9.2',
'boto3==1.4.4',
'click==6.7',
'cron-descriptor==1.2.10',
'croniter==0.3.17',
'Flask==0.12.2',
'Flask-Cors==3.0.2',
Expand All @@ -27,6 +28,6 @@
'toposort==1.5'
],
dependency_links=[
'https://github.com/CityOfPhiladelphia/restful-ben/tarball/master#egg=restful_ben-0.1.0'
'https://github.com/CityOfPhiladelphia/restful-ben/tarball/0.0.1#egg=restful_ben-0.1.0'
],
)
2 changes: 2 additions & 0 deletions taskflow/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ def add_push_worker(self, push_worker):
self._push_workers[push_worker.push_type] = push_worker

def get_push_worker(self, push_type):
if push_type not in self._push_workers:
return None
return self._push_workers[push_type]

def sync_tasks(self, session, tasks):
Expand Down
4 changes: 2 additions & 2 deletions taskflow/monitoring/aws.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import boto3

from .base import Monitor
from .base import MonitorDestination

class AWSMonitor(Monitor):
class AWSMonitor(MonitorDestination):
def __init__(self,
metric_prefix='',
metric_namespace='taskflow',
Expand Down
34 changes: 34 additions & 0 deletions taskflow/monitoring/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,39 @@
import logging

class Monitor(object):
def __init__(self, destinations=None):
self.logger = logging.getLogger('Monitoring')

self.destinations = destinations or []

def call_destinations(self, fn_name, *args):
for destination in self.destinations:
try:
getattr(destination, fn_name)(*args)
except:
self.logger.exception('Exception trying to call `{}` on the `{}` monitor.'\
.format(fn_name, destination.__class__.__name__))

def heartbeat_scheduler(self):
self.call_destinations('heartbeat_scheduler')

def task_retry(self, task_instance):
self.call_destinations('task_retry', task_instance)

def task_failed(self, task_instance):
self.call_destinations('task_failed', task_instance)

def task_success(self, task_instance):
self.call_destinations('task_success', task_instance)

def workflow_failed(self, workflow_instance):
self.call_destinations('workflow_failed', workflow_instance)

def workflow_success(self, workflow_instance):
self.call_destinations('workflow_success', workflow_instance)


class MonitorDestination(object):
def heartbeat_scheduler(self):
pass

Expand Down
142 changes: 142 additions & 0 deletions taskflow/monitoring/slack.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import re
import json

from cron_descriptor import get_description
import requests

from taskflow import WorkflowInstance, TaskInstance
from .base import MonitorDestination

class SlackMonitor(MonitorDestination):
def __init__(self, session, taskflow, slack_url=None):
self.slack_url = slack_url or os.getenv('SLACK_WEBHOOK_URL')
self.session = session
self.taskflow = taskflow

def get_log_url(self, failed_task):
push_destination = self.taskflow.get_task(failed_task.task_name).push_destination
push_worker = self.taskflow.get_push_worker(push_destination)
if push_worker and hasattr(push_worker, 'get_log_url'):
return push_worker.get_log_url(failed_task)
return None

def get_message(self, item):
if isinstance(item, WorkflowInstance):
item_type = 'Workflow'
name = item.workflow_name

failed_tasks = self.session.query(TaskInstance)\
.filter(TaskInstance.workflow_instance_id == item.id,
TaskInstance.status == 'failed')\
.all()

failed_task_attachments = []
for failed_task in failed_tasks:
failed_task_attachments.append({
'title': 'Failed Workflow Task',
'color': '#ff0000',
'fields': [
{
'title': 'Task',
'value': failed_task.task_name
},
{
'title': 'ID',
'value': failed_task.id
},
{
'title': 'Number of Attempts',
'value': failed_task.attempts
},
{
'title': 'Logs',
'value': self.get_log_url(failed_task)
}
]
})
else:
item_type = 'Task'
name = item.task_name

attachments = [{
'title': '{} Failure'.format(item_type),
'text': '<!channel> A {} in Taskflow failed'.format(item_type.lower()),
'color': '#ff0000',
'fields': [
{
'title': item_type,
'value': name,
'short': False
},
{
'title': 'ID',
'value': item.id
},
{
'title': 'Priority',
'value': item.priority
},
{
'title': 'Scheduled Run Time',
'value': '{:%Y-%m-%d %H:%M:%S}'.format(item.run_at)
},
{
'title': 'Start Time',
'value': '{:%Y-%m-%d %H:%M:%S}'.format(item.started_at)
},
{
'title': 'Failure Time',
'value': '{:%Y-%m-%d %H:%M:%S}'.format(item.ended_at)
}
]
}]

if item.scheduled:
workflow = self.taskflow.get_workflow(item.workflow_name)
attachments[0]['fields'].append({
'title': 'Schedule',
'value': '{} ({})'.format(get_description(workflow.schedule), workflow.schedule)
})

if failed_task_attachments:
attachments += failed_task_attachments
else:
attachments[0]['fields'].append({
'title': 'Number of Attempts',
'value': item.attempts
})
attachments[0]['fields'].append({
'title': 'Logs',
'value': self.get_log_url(item)
})

return {'attachments': attachments}

def send_to_slack(self, message):
requests.post(
self.slack_url,
data=json.dumps(message),
headers={'Content-Type': 'application/json'})

def heartbeat_scheduler(self):
pass

def task_retry(self, task_instance):
pass

def task_failed(self, task_instance):
## only alert on tasks not associated with a workflow.
## Task failures will bubble up to the workflow
if task_instance.workflow_instance_id == None:
message = self.get_message(task_instance)
self.send_to_slack(message)

def task_success(self, task_instance):
pass

def workflow_failed(self, workflow_instance):
message = self.get_message(workflow_instance)
self.send_to_slack(message)

def workflow_success(self, workflow_instance):
pass
13 changes: 13 additions & 0 deletions taskflow/push_workers/aws_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ def __init__(self, *args, default_job_queue=None, default_job_definition=None, *
self.default_job_queue = default_job_queue
self.default_job_definition = default_job_definition

def get_log_url(self, task_instance):
try:
push_state = task_instance.push_state
if push_state and 'taskArn' in push_state:
task_id = re.match(r'task/(.+)', push_state['taskArn']).groups()[0]

return '{}/{}/{}'.format(push_state['jobName'][:50], push_state['jobId'], task_id)
except:
self.logger.exception('Exception getting AWS Batch log URL')

return None

def sync_task_instance_states(self, session, dry_run, task_instances):
jobs = dict()
for task_instance in task_instances:
Expand All @@ -41,6 +53,7 @@ def sync_task_instance_states(self, session, dry_run, task_instances):
task_instance = jobs[job['jobId']]
if task_instance.status != status:
task_instance.status = status
task_instance.push_state = job

if not dry_run:
session.commit()
Expand Down
134 changes: 134 additions & 0 deletions tests/test_monitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
from datetime import datetime

import requests_mock
from restful_ben.test_utils import dict_contains

from taskflow import WorkflowInstance, TaskInstance
from taskflow.monitoring.slack import SlackMonitor

from shared_fixtures import *

def test_slack_monitor(dbsession, workflows):
taskflow = Taskflow()
taskflow.add_workflows(workflows)

workflow_instance = WorkflowInstance(
workflow_name='workflow1',
scheduled=True,
run_at=datetime(2017, 6, 3, 6),
started_at=datetime(2017, 6, 3, 6),
ended_at=datetime(2017, 6, 3, 6, 0, 36),
status='running',
priority='normal')
dbsession.add(workflow_instance)
dbsession.commit()
task_instance1 = TaskInstance(
task_name='task1',
scheduled=True,
workflow_instance_id=workflow_instance.id,
status='success',
run_at=datetime(2017, 6, 3, 6, 0, 34),
attempts=1,
priority='normal',
push=False,
timeout=300,
retry_delay=300)
task_instance2 = TaskInstance(
task_name='task2',
scheduled=True,
workflow_instance_id=workflow_instance.id,
status='success',
run_at=datetime(2017, 6, 3, 6, 0, 34),
attempts=1,
priority='normal',
push=False,
timeout=300,
retry_delay=300)
task_instance3 = TaskInstance(
task_name='task3',
scheduled=True,
workflow_instance_id=workflow_instance.id,
status='failed',
run_at=datetime(2017, 6, 3, 6, 0, 34),
attempts=1,
priority='normal',
push=False,
timeout=300,
retry_delay=300)
dbsession.add(task_instance1)
dbsession.add(task_instance2)
dbsession.add(task_instance3)
dbsession.commit()

slack_url = 'http://fakeurl.com/foo'

slack_monitor = SlackMonitor(dbsession, taskflow, slack_url=slack_url)

with requests_mock.Mocker() as m:
m.post(slack_url)

slack_monitor.workflow_failed(workflow_instance)

assert m.called
assert m.call_count == 1

request = m.request_history[0]
data = request.json()
assert dict_contains(data['attachments'][0], {
'text': '<!channel> A workflow in Taskflow failed',
'title': 'Workflow Failure',
'color': '#ff0000',
'fields': [
{
'title': 'Workflow',
'short': False,
'value': 'workflow1'
},
{
'title': 'ID',
'value': 1
},
{
'title': 'Priority',
'value': 'normal'
},
{
'title': 'Scheduled Run Time',
'value': '2017-06-03 06:00:00'
},
{
'title': 'Start Time',
'value': '2017-06-03 06:00:00'
},
{
'title': 'Failure Time',
'value': '2017-06-03 06:00:36'
},
{
'title': 'Schedule',
'value': 'At 06:00 AM (0 6 * * *)'
}
]
})
assert dict_contains(data['attachments'][1], {
'title': 'Failed Workflow Task',
'color': '#ff0000',
'fields': [
{
'title': 'Task',
'value': 'task3'
},
{
'title': 'ID',
'value': 3
},
{
'title': 'Number of Attempts',
'value': 1
},
{
'title': 'Logs',
'value': None
}
]
})

0 comments on commit ec733d3

Please sign in to comment.