Skip to content

Commit

Permalink
feat(insights): add custom function support to EAP (#85662)
Browse files Browse the repository at this point in the history
Work for #81750 

1. Adds the ability to define custom functions (formulas)
2. Implement the `http_response_rate` function, which is used in
insights
  • Loading branch information
DominikB2014 authored Mar 3, 2025
1 parent d10637f commit ffa89a6
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 66 deletions.
116 changes: 95 additions & 21 deletions src/sentry/search/eap/columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any

from dateutil.tz import tz
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import Column
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import (
AttributeAggregation,
Expand Down Expand Up @@ -77,31 +78,14 @@ class ArgumentDefinition:
argument_types: set[constants.SearchType] | None = None
# The public alias for the default arg, the SearchResolver will resolve this value
default_arg: str | None = None
# Sets the argument as an attribute, for custom functions like `http_response rate` we might have non-attribute parameters
is_attribute: bool = True
# Validator to check if the value is allowed for this argument
validator: Callable[[Any], Any] | None = None
# Whether this argument is completely ignored, used for `count()`
ignored: bool = False


@dataclass
class FunctionDefinition:
internal_function: Function.ValueType
# The list of arguments for this function
arguments: list[ArgumentDefinition]
# The search_type the argument should be the default type for this column
default_search_type: constants.SearchType
# Try to infer the search type from the function arguments
infer_search_type_from_arguments: bool = True
# The internal rpc type for this function, optional as it can mostly be inferred from search_type
internal_type: AttributeKey.Type.ValueType | None = None
# Processor is the function run in the post process step to transform a row into the final result
processor: Callable[[Any], Any] | None = None
# Whether to request extrapolation or not, should be true for all functions except for _sample functions for debugging
extrapolation: bool = True

@property
def required_arguments(self) -> list[ArgumentDefinition]:
return [arg for arg in self.arguments if arg.default_arg is None and not arg.ignored]


@dataclass
class VirtualColumnDefinition:
constructor: Callable[[SnubaParams], VirtualColumnContext]
Expand All @@ -117,6 +101,24 @@ class VirtualColumnDefinition:
default_value: str | None = None


@dataclass(frozen=True, kw_only=True)
class ResolvedFormula(ResolvedAttribute):
formula: Column.BinaryFormula

@property
def proto_definition(self) -> Column.BinaryFormula:
"""The definition of this function as needed by the RPC"""
return self.formula

@property
def proto_type(self) -> AttributeKey.Type.ValueType:
"""The rpc always returns functions as floats, especially count() even though it should be an integer
see: https://www.notion.so/sentry/Should-count-return-an-int-in-the-v1-RPC-API-1348b10e4b5d80498bfdead194cc304e
"""
return constants.DOUBLE


@dataclass(frozen=True, kw_only=True)
class ResolvedFunction(ResolvedAttribute):
# The internal rpc alias for this column
Expand Down Expand Up @@ -147,6 +149,77 @@ def proto_type(self) -> AttributeKey.Type.ValueType:
return constants.DOUBLE


@dataclass
class FunctionDefinition:
internal_function: Function.ValueType
# The list of arguments for this function
arguments: list[ArgumentDefinition]
# The search_type the argument should be the default type for this column
default_search_type: constants.SearchType
# Try to infer the search type from the function arguments
infer_search_type_from_arguments: bool = True
# The internal rpc type for this function, optional as it can mostly be inferred from search_type
internal_type: AttributeKey.Type.ValueType | None = None
# Processor is the function run in the post process step to transform a row into the final result
processor: Callable[[Any], Any] | None = None
# Whether to request extrapolation or not, should be true for all functions except for _sample functions for debugging
extrapolation: bool = True

@property
def required_arguments(self) -> list[ArgumentDefinition]:
return [arg for arg in self.arguments if arg.default_arg is None and not arg.ignored]

def resolve(
self, alias: str, search_type: constants.SearchType, resolved_argument: AttributeKey | None
) -> ResolvedFunction:
return ResolvedFunction(
public_alias=alias,
internal_name=self.internal_function,
search_type=search_type,
internal_type=self.internal_type,
processor=self.processor,
extrapolation=self.extrapolation,
argument=resolved_argument,
)


@dataclass
class FormulaDefinition:
# The list of arguments for this function
arguments: list[ArgumentDefinition]
# A function that takes in the resolved argument and returns a Column.BinaryFormula
formula_resolver: Callable[[Any], Any]
# The search_type the argument should be the default type for this column
default_search_type: constants.SearchType
# Try to infer the search type from the function arguments
infer_search_type_from_arguments: bool = True
# The internal rpc type for this function, optional as it can mostly be inferred from search_type
internal_type: AttributeKey.Type.ValueType | None = None
# Processor is the function run in the post process step to transform a row into the final result
processor: Callable[[Any], Column.BinaryFormula] | None = None
# Whether to request extrapolation or not, should be true for all functions except for _sample functions for debugging
extrapolation: bool = True

@property
def required_arguments(self) -> list[ArgumentDefinition]:
return [arg for arg in self.arguments if arg.default_arg is None and not arg.ignored]

def resolve(
self,
alias: str,
search_type: constants.SearchType,
resolved_argument: AttributeKey | Any | None,
) -> ResolvedFormula:
return ResolvedFormula(
public_alias=alias,
search_type=search_type,
formula=self.formula_resolver(resolved_argument),
argument=resolved_argument,
internal_type=self.internal_type,
processor=self.processor,
)


def simple_sentry_field(field) -> ResolvedColumn:
"""For a good number of fields, the public alias matches the internal alias
without the `sentry.` suffix. This helper functions makes defining them easier"""
Expand Down Expand Up @@ -198,6 +271,7 @@ def project_term_resolver(
@dataclass(frozen=True)
class ColumnDefinitions:
functions: dict[str, FunctionDefinition]
formulas: dict[str, FormulaDefinition]
columns: dict[str, ResolvedColumn]
contexts: dict[str, VirtualColumnDefinition]
trace_item_type: TraceItemType.ValueType
38 changes: 38 additions & 0 deletions src/sentry/search/eap/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,41 @@

PROJECT_FIELDS = {"project", "project.slug", "project.name"}
REVERSE_CONTEXT_ERROR = "Unknown value {} for filter {}, expecting one of: {}"

RESPONSE_CODE_MAP = {
1: ["100", "101", "102"],
2: ["200", "201", "202", "203", "204", "205", "206", "207", "208", "226"],
3: ["300", "301", "302", "303", "304", "305", "306", "307", "308"],
4: [
"400",
"401",
"402",
"403",
"404",
"405",
"406",
"407",
"408",
"409",
"410",
"411",
"412",
"413",
"414",
"415",
"416",
"417",
"418",
"421",
"422",
"423",
"424",
"425",
"426",
"428",
"429",
"431",
"451",
],
5: ["500", "501", "502", "503", "504", "505", "506", "507", "508", "509", "510", "511"],
}
1 change: 1 addition & 0 deletions src/sentry/search/eap/ourlog_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@

OURLOG_DEFINITIONS = ColumnDefinitions(
functions={},
formulas={},
columns=OURLOG_ATTRIBUTE_DEFINITIONS,
contexts=OURLOG_VIRTUAL_CONTEXTS,
trace_item_type=TraceItemType.TRACE_ITEM_TYPE_LOG,
Expand Down
100 changes: 61 additions & 39 deletions src/sentry/search/eap/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass, field
from datetime import datetime
from re import Match
from typing import Literal, cast
from typing import Any, Literal, cast

import sentry_sdk
from parsimonious.exceptions import ParseError
Expand Down Expand Up @@ -37,7 +37,10 @@
from sentry.search.eap import constants
from sentry.search.eap.columns import (
ColumnDefinitions,
FormulaDefinition,
FunctionDefinition,
ResolvedColumn,
ResolvedFormula,
ResolvedFunction,
VirtualColumnDefinition,
)
Expand All @@ -61,9 +64,9 @@ class SearchResolver:
_resolved_attribute_cache: dict[str, tuple[ResolvedColumn, VirtualColumnDefinition | None]] = (
field(default_factory=dict)
)
_resolved_function_cache: dict[str, tuple[ResolvedFunction, VirtualColumnDefinition | None]] = (
field(default_factory=dict)
)
_resolved_function_cache: dict[
str, tuple[ResolvedFunction | ResolvedFormula, VirtualColumnDefinition | None]
] = field(default_factory=dict)

@sentry_sdk.trace
def resolve_meta(self, referrer: str) -> RequestMeta:
Expand Down Expand Up @@ -549,9 +552,10 @@ def resolve_contexts(
return final_contexts

@sentry_sdk.trace
def resolve_columns(
self, selected_columns: list[str]
) -> tuple[list[ResolvedColumn | ResolvedFunction], list[VirtualColumnDefinition | None]]:
def resolve_columns(self, selected_columns: list[str]) -> tuple[
list[ResolvedColumn | ResolvedFunction | ResolvedFormula],
list[VirtualColumnDefinition | None],
]:
"""Given a list of columns resolve them and get their context if applicable
This function will also dedupe the virtual column contexts if necessary
Expand Down Expand Up @@ -586,7 +590,7 @@ def resolve_columns(

def resolve_column(
self, column: str, match: Match | None = None
) -> tuple[ResolvedColumn | ResolvedFunction, VirtualColumnDefinition | None]:
) -> tuple[ResolvedColumn | ResolvedFunction | ResolvedFormula, VirtualColumnDefinition | None]:
"""Column is either an attribute or an aggregate, this function will determine which it is and call the relevant
resolve function"""
match = fields.is_function(column)
Expand Down Expand Up @@ -668,7 +672,7 @@ def resolve_attribute(
@sentry_sdk.trace
def resolve_aggregates(
self, columns: list[str]
) -> tuple[list[ResolvedFunction], list[VirtualColumnDefinition | None]]:
) -> tuple[list[ResolvedFunction | ResolvedFormula], list[VirtualColumnDefinition | None]]:
"""Helper function to resolve a list of aggregates instead of 1 attribute at a time"""
resolved_aggregates, resolved_contexts = [], []
for column in columns:
Expand All @@ -679,10 +683,10 @@ def resolve_aggregates(

def resolve_aggregate(
self, column: str, match: Match | None = None
) -> tuple[ResolvedFunction, VirtualColumnDefinition | None]:
) -> tuple[ResolvedFunction | ResolvedFormula, VirtualColumnDefinition | None]:
if column in self._resolved_function_cache:
return self._resolved_function_cache[column]
# Check if this is a valid function, parse the function name and args out
# Check if the column looks like a function (matches a pattern), parse the function name and args out
if match is None:
match = fields.is_function(column)
if match is None:
Expand All @@ -694,24 +698,47 @@ def resolve_aggregate(
alias = match.group("alias") or column

# Get the function definition
if function not in self.definitions.functions:
function_definition: FunctionDefinition | FormulaDefinition
if function in self.definitions.functions:
function_definition = self.definitions.functions[function]
elif function in self.definitions.formulas:
function_definition = self.definitions.formulas[function]
else:
raise InvalidSearchQuery(f"Unknown function {function}")
function_definition = self.definitions.functions[function]

parsed_columns = []
parsed_args: list[ResolvedColumn | Any] = []

# Parse the arguments
attribute_args = fields.parse_arguments(function, columns)
if len(attribute_args) < len(function_definition.required_arguments):
arguments = fields.parse_arguments(function, columns)
if len(arguments) < len(function_definition.required_arguments):
raise InvalidSearchQuery(
f"Invalid number of arguments for {function}, was expecting {len(function_definition.required_arguments)} arguments"
)

for index, argument in enumerate(function_definition.arguments):
if argument.ignored:
continue
if index < len(attribute_args):
parsed_argument, _ = self.resolve_attribute(attribute_args[index])
if argument.validator is not None:
if not argument.validator(arguments[index]):
raise InvalidSearchQuery(
f"{arguments[index]} is not a valid argument for {function}"
)

if index < len(arguments):
if argument.is_attribute:
parsed_argument, _ = self.resolve_attribute(arguments[index])
else:
if argument.argument_types is None:
parsed_args.append(arguments[index]) # assume it's a string
continue
# TODO: we assume that the argument is only one type for now, and we only support string/integer
for type in argument.argument_types:
if type == "integer":
parsed_args.append(int(arguments[index]))
else:
parsed_args.append(arguments[index])
continue

elif argument.default_arg:
parsed_argument, _ = self.resolve_attribute(argument.default_arg)
else:
Expand All @@ -726,34 +753,29 @@ def resolve_aggregate(
raise InvalidSearchQuery(
f"{argument} is invalid for {function}, its a {parsed_argument.search_type} type field but {function} expects a field that are one of these types: {argument.argument_types}"
)
parsed_columns.append(parsed_argument)
parsed_args.append(parsed_argument)

# Proto doesn't support anything more than 1 argument yet
if len(parsed_columns) > 1:
if len(parsed_args) > 1:
raise InvalidSearchQuery("Cannot use more than one argument")
elif len(parsed_columns) == 1 and isinstance(
parsed_columns[0].proto_definition, AttributeKey
):
parsed_column = parsed_columns[0]
resolved_argument = parsed_column.proto_definition
search_type = (
parsed_column.search_type
if function_definition.infer_search_type_from_arguments
else function_definition.default_search_type
)
elif len(parsed_args) == 1:
parsed_arg = parsed_args[0]
if not isinstance(parsed_arg, ResolvedColumn):
resolved_argument = parsed_arg
search_type = function_definition.default_search_type
elif isinstance(parsed_arg.proto_definition, AttributeKey):
resolved_argument = parsed_arg.proto_definition
search_type = (
parsed_arg.search_type
if function_definition.infer_search_type_from_arguments
else function_definition.default_search_type
)
else:
resolved_argument = None
search_type = function_definition.default_search_type

resolved_function = ResolvedFunction(
public_alias=alias,
internal_name=function_definition.internal_function,
search_type=search_type,
internal_type=function_definition.internal_type,
processor=function_definition.processor,
extrapolation=function_definition.extrapolation,
argument=resolved_argument,
)
resolved_function = function_definition.resolve(alias, search_type, resolved_argument)

resolved_context = None
self._resolved_function_cache[column] = (resolved_function, resolved_context)
return self._resolved_function_cache[column]
Loading

0 comments on commit ffa89a6

Please sign in to comment.