Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

faet(sentry apps): Add context manager for sentry apps and impl for event webhooks #86136

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
42 changes: 42 additions & 0 deletions src/sentry/sentry_apps/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from collections.abc import Mapping
from dataclasses import dataclass
from enum import StrEnum
from typing import Any

from sentry.integrations.types import EventLifecycleOutcome
from sentry.integrations.utils.metrics import EventLifecycleMetric


class SentryAppInteractionType(StrEnum):
"""Actions that Sentry Apps can do"""

# Webhook actions
PREPARE_WEBHOOK = "prepare_webhook"
SEND_WEBHOOK = "send_webhook"


@dataclass
class SentryAppInteractionEvent(EventLifecycleMetric):
"""An event under the Sentry App umbrella"""

operation_type: SentryAppInteractionType
event_type: str

def get_metric_key(self, outcome: EventLifecycleOutcome) -> str:
tokens = ("sentry_app", self.operation_type, str(outcome))
return ".".join(tokens)

def get_event_type(self) -> str:
return self.event_type if self.event_type else ""

def get_metric_tags(self) -> Mapping[str, str]:
return {
"operation_type": self.operation_type,
"event_type": self.event_type,
}

def get_extras(self) -> Mapping[str, Any]:
return {
"event_type": self.get_event_type(),
"operation_type": self.operation_type,
}
184 changes: 105 additions & 79 deletions src/sentry/sentry_apps/tasks/sentry_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections.abc import Mapping, Sequence
from typing import Any

import sentry_sdk
from celery import Task, current_task
from django.urls import reverse
from requests.exceptions import RequestException
Expand All @@ -22,6 +23,7 @@
from sentry.models.organizationmapping import OrganizationMapping
from sentry.models.project import Project
from sentry.sentry_apps.api.serializers.app_platform_event import AppPlatformEvent
from sentry.sentry_apps.metrics import SentryAppInteractionEvent, SentryAppInteractionType
from sentry.sentry_apps.models.sentry_app import VALID_EVENTS, SentryApp
from sentry.sentry_apps.models.sentry_app_installation import SentryAppInstallation
from sentry.sentry_apps.models.servicehook import ServiceHook, ServiceHookProject
Expand All @@ -32,6 +34,7 @@
get_installation,
get_installations_for_organization,
)
from sentry.sentry_apps.utils.errors import SentryAppSentryError
from sentry.shared_integrations.exceptions import ApiHostError, ApiTimeoutError, ClientError
from sentry.silo.base import SiloMode
from sentry.tasks.base import instrumented_task, retry
Expand Down Expand Up @@ -214,73 +217,88 @@ def _process_resource_change(
retryer: Task | None = None,
**kwargs: Any,
) -> None:
# The class is serialized as a string when enqueueing the class.
model: type[Event] | type[Model] = TYPES[sender]
instance: Event | Model | None = None

project_id: int | None = kwargs.get("project_id", None)
group_id: int | None = kwargs.get("group_id", None)
if sender == "Error" and project_id and group_id:
# Read event from nodestore as Events are heavy in task messages.
nodedata = nodestore.backend.get(Event.generate_node_id(project_id, str(instance_id)))
if not nodedata:
extra = {"sender": sender, "action": action, "event_id": instance_id}
logger.info("process_resource_change.event_missing_event", extra=extra)
with SentryAppInteractionEvent(
operation_type=SentryAppInteractionType.PREPARE_WEBHOOK,
event_type=f"process_resource_change.{sender}_{action}",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We dont have an 'event' string at this point so making something up

).capture() as lifecycle:

# The class is serialized as a string when enqueueing the class.
model: type[Event] | type[Model] = TYPES[sender]
instance: Event | Model | None = None

project_id: int | None = kwargs.get("project_id", None)
group_id: int | None = kwargs.get("group_id", None)
if sender == "Error" and project_id and group_id:
# Read event from nodestore as Events are heavy in task messages.
nodedata = nodestore.backend.get(Event.generate_node_id(project_id, str(instance_id)))
if not nodedata:
extra = {"sender": sender, "action": action, "event_id": instance_id}
lifecycle.record_failure(
failure_reason="process_resource_change.event_missing_event", extra=extra
)
return
instance = Event(
project_id=project_id, group_id=group_id, event_id=str(instance_id), data=nodedata
)
name = sender.lower()
else:
# Some resources are named differently than their model. eg. Group vs Issue.
# Looks up the human name for the model. Defaults to the model name.
name = RESOURCE_RENAMES.get(model.__name__, model.__name__.lower())

# By default, use Celery's `current_task` but allow a value to be passed for the
# bound Task.
retryer = retryer or current_task

# We may run into a race condition where this task executes before the
# transaction that creates the Group has committed.
if not issubclass(model, Event):
try:
instance = model.objects.get(id=instance_id)
except model.DoesNotExist as e:
# Explicitly requeue the task, so we don't report this to Sentry until
# we hit the max number of retries.
return retryer.retry(exc=e)

event = f"{name}.{action}"
lifecycle.add_extras(extras={"event_name": event, "instance_id": instance_id})

if event not in VALID_EVENTS:
lifecycle.record_failure(
failure_reason="invalid_event",
)
return
instance = Event(
project_id=project_id, group_id=group_id, event_id=str(instance_id), data=nodedata
)
name = sender.lower()
else:
# Some resources are named differently than their model. eg. Group vs Issue.
# Looks up the human name for the model. Defaults to the model name.
name = RESOURCE_RENAMES.get(model.__name__, model.__name__.lower())

# By default, use Celery's `current_task` but allow a value to be passed for the
# bound Task.
retryer = retryer or current_task

# We may run into a race condition where this task executes before the
# transaction that creates the Group has committed.
if not issubclass(model, Event):
try:
instance = model.objects.get(id=instance_id)
except model.DoesNotExist as e:
# Explicitly requeue the task, so we don't report this to Sentry until
# we hit the max number of retries.
return retryer.retry(exc=e)

event = f"{name}.{action}"

if event not in VALID_EVENTS:
return

org = None
org = None

if isinstance(instance, (Group, Event, GroupEvent)):
org = Organization.objects.get_from_cache(
id=Project.objects.get_from_cache(id=instance.project_id).organization_id
)
assert org, "organization must exist to get related sentry app installations"

installations = [
installation
for installation in app_service.installations_for_organization(organization_id=org.id)
if event in installation.sentry_app.events
]

for installation in installations:
data = {}
if isinstance(instance, (Event, GroupEvent)):
assert instance.group_id, "group id is required to create webhook event data"
data[name] = _webhook_event_data(instance, instance.group_id, instance.project_id)
else:
data[name] = serialize(instance)

# Trigger a new task for each webhook
send_resource_change_webhook.delay(
installation_id=installation.id, event=event, data=data
if isinstance(instance, (Group, Event, GroupEvent)):
org = Organization.objects.get_from_cache(
id=Project.objects.get_from_cache(id=instance.project_id).organization_id
)
assert org, "organization must exist to get related sentry app installations"

installations = [
installation
for installation in app_service.installations_for_organization(
organization_id=org.id
)
if event in installation.sentry_app.events
]

for installation in installations:
data = {}
if isinstance(instance, (Event, GroupEvent)):
assert instance.group_id, "group id is required to create webhook event data"
data[name] = _webhook_event_data(
instance, instance.group_id, instance.project_id
)
else:
data[name] = serialize(instance)

# Trigger a new task for each webhook
send_resource_change_webhook.delay(
installation_id=installation.id, event=event, data=data
)


@instrumented_task(
Expand Down Expand Up @@ -448,17 +466,28 @@ def get_webhook_data(
def send_resource_change_webhook(
installation_id: int, event: str, data: dict[str, Any], *args: Any, **kwargs: Any
) -> None:
installation = app_service.installation_by_id(id=installation_id)
if not installation:
logger.info(
"send_resource_change_webhook.missing_installation",
extra={"installation_id": installation_id, "event": event},
)
return
with SentryAppInteractionEvent(
operation_type=SentryAppInteractionType.SEND_WEBHOOK, event_type=event
).capture() as lifecycle:
installation = app_service.installation_by_id(id=installation_id)
if not installation:
logger.info(
"send_resource_change_webhook.missing_installation",
extra={"installation_id": installation_id, "event": event},
)
return

send_webhooks(installation, event, data=data)
try:
send_webhooks(installation, event, data=data)
except SentryAppSentryError as e:
sentry_sdk.capture_exception(e)
lifecycle.record_failure(e)
return None
except (ApiHostError, ApiTimeoutError, RequestException, ClientError) as e:
lifecycle.record_halt(e)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ApiHostError & ApiTimeoutError are raised from send_and_save_webhook_request when the response is 503 or 504.

RequestExceptions occur when we get a Timeout or ConnectionError, these I also woudn't consider our server being at fault.

ClientErrors are anything <500 and also wouldn't be our fault, because it's the 3p responsibility to properly consume our requests.

We re raise since the TASK_OPTIONS already have the retry logic specified (i.e which errors to retry or ignore)

raise

metrics.incr("resource_change.processed", sample_rate=1.0, tags={"change_event": event})
metrics.incr("resource_change.processed", sample_rate=1.0, tags={"change_event": event})


def notify_sentry_app(event: GroupEvent, futures: Sequence[RuleFuture]):
Expand Down Expand Up @@ -493,19 +522,15 @@ def notify_sentry_app(event: GroupEvent, futures: Sequence[RuleFuture]):

def send_webhooks(installation: RpcSentryAppInstallation, event: str, **kwargs: Any) -> None:
servicehook: ServiceHook
extras: dict[str, int | str] = {"installation_id": installation.id, "event": event}
try:
servicehook = ServiceHook.objects.get(
organization_id=installation.organization_id, actor_id=installation.id
)
except ServiceHook.DoesNotExist:
logger.info(
"send_webhooks.missing_servicehook",
extra={"installation_id": installation.id, "event": event},
)
return None

raise SentryAppSentryError("send_webhooks.missing_servicehook", webhook_context=extras)
if event not in servicehook.events:
return None
raise SentryAppSentryError("send_webhooks.event_not_in_servicehook", webhook_context=extras)

# The service hook applies to all projects if there are no
# ServiceHookProject records. Otherwise we want check if
Expand Down Expand Up @@ -535,6 +560,7 @@ def send_webhooks(installation: RpcSentryAppInstallation, event: str, **kwargs:
kwargs["install"] = installation

request_data = AppPlatformEvent(**kwargs)

send_and_save_webhook_request(
installation.sentry_app,
request_data,
Expand Down
11 changes: 11 additions & 0 deletions src/sentry/testutils/asserts.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from functools import reduce

from django.http import StreamingHttpResponse

from sentry.integrations.types import EventLifecycleOutcome
Expand Down Expand Up @@ -110,3 +112,12 @@ def assert_middleware_metrics(middleware_calls):
assert end1.args[0] == EventLifecycleOutcome.SUCCESS
assert start2.args[0] == EventLifecycleOutcome.STARTED
assert end2.args[0] == EventLifecycleOutcome.SUCCESS


def assert_count_of_metric(mock_record, outcome, outcome_count):
calls = reduce(
lambda acc, calls: (acc + 1 if calls.args[0] == outcome else acc),
mock_record.mock_calls,
0,
)
assert calls == outcome_count
Loading
Loading