Skip to content

Commit

Permalink
ref(spans): Add an ingest-spans topic for the spans pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-auer committed Mar 3, 2025
1 parent 37496a2 commit fabb7cf
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 19 deletions.
1 change: 1 addition & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2954,6 +2954,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
"ingest-transactions": "default",
"ingest-transactions-dlq": "default",
"ingest-transactions-backlog": "default",
"ingest-spans": "default",
"ingest-metrics": "default",
"ingest-metrics-dlq": "default",
"snuba-metrics": "default",
Expand Down
1 change: 1 addition & 0 deletions src/sentry/conf/types/kafka_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Topic(Enum):
INGEST_TRANSACTIONS = "ingest-transactions"
INGEST_TRANSACTIONS_DLQ = "ingest-transactions-dlq"
INGEST_TRANSACTIONS_BACKLOG = "ingest-transactions-backlog"
INGEST_SPANS = "ingest-spans"
INGEST_METRICS = "ingest-metrics"
INGEST_METRICS_DLQ = "ingest-metrics-dlq"
SNUBA_METRICS = "snuba-metrics"
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/consumers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ def ingest_transactions_options() -> list[click.Option]:
},
},
"process-spans": {
"topic": Topic.SNUBA_SPANS,
"topic": Topic.INGEST_SPANS,
"strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory",
"click_options": multiprocessing_options(default_max_batch_size=100),
},
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/spans/consumers/process/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

logger = logging.getLogger(__name__)

SPANS_CODEC: Codec[SpanEvent] = get_topic_codec(Topic.SNUBA_SPANS)
SPANS_CODEC: Codec[SpanEvent] = get_topic_codec(Topic.INGEST_SPANS)
MAX_PAYLOAD_SIZE = 10 * 1000 * 1000 # 10 MB

BATCH_SIZE = 100
Expand Down
29 changes: 22 additions & 7 deletions src/sentry/spans/consumers/process_segments/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
from collections.abc import Mapping
from typing import Any

import rapidjson
import sentry_sdk
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
from arroyo.processing.strategies.commit import CommitOffsets
from arroyo.types import BrokerValue, Commit, Message, Partition
from arroyo.processing.strategies.produce import Produce
from arroyo.processing.strategies.unfold import Unfold
from arroyo.types import BrokerValue, Commit, Message, Partition, Value
from sentry_kafka_schemas.codecs import Codec
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import BufferedSegment

Expand All @@ -27,10 +30,7 @@ def _deserialize_segment(value: bytes) -> Mapping[str, Any]:
def process_message(message: Message[KafkaPayload]):
value = message.payload.value
segment = _deserialize_segment(value)

assert segment["spans"]

process_segment(segment["spans"])
return process_segment(segment["spans"])


def _process_message(message: Message[KafkaPayload]):
Expand All @@ -44,11 +44,18 @@ def _process_message(message: Message[KafkaPayload]):
op="process", name="spans.process_segments.process_message"
):
sentry_sdk.set_measurement("message_size.bytes", len(message.payload.value))
process_message(message)
return process_message(message)
except Exception:
sentry_sdk.capture_exception()


def explode_segment(spans: list[dict[str, Any]]):
for span in spans:
if span is not None:
payload = rapidjson.dumps(span)
yield Value(KafkaPayload(None, payload, []), {}, None)


class DetectPerformanceIssuesStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(
self,
Expand All @@ -70,9 +77,17 @@ def create_with_partitions(
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
produce_step = Produce(
producer=self.producer,
topic=self.output_topic,
next_step=CommitOffsets(commit),
)

unfold_step = Unfold(generator=explode_segment, next_step=produce_step)

return run_task_with_multiprocessing(
function=_process_message,
next_step=CommitOffsets(commit),
next_step=unfold_step,
max_batch_size=self.max_batch_size,
max_batch_time=self.max_batch_time,
pool=self.pool,
Expand Down
2 changes: 1 addition & 1 deletion src/sentry/spans/consumers/process_segments/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,4 @@ def process_segment(spans: list[dict[str, Any]]):
_send_occurrence_to_platform(jobs, projects)
_record_transaction_info(jobs, projects)

return jobs
return spans
18 changes: 9 additions & 9 deletions tests/sentry/spans/consumers/process/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def process_spans_strategy():
def test_consumer_pushes_to_redis():
redis_client = get_redis_client()

topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"])
partition = Partition(topic, 0)
strategy = process_spans_strategy().create_with_partitions(
commit=mock.Mock(),
Expand Down Expand Up @@ -134,7 +134,7 @@ def test_consumer_pushes_to_redis():
}
)
def test_produces_valid_segment_to_kafka():
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"])
partition = Partition(topic, 0)
factory = process_spans_strategy()
with mock.patch.object(
Expand Down Expand Up @@ -175,7 +175,7 @@ def test_produces_valid_segment_to_kafka():
}
)
def test_rejects_large_message_size_to_kafka():
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"])
partition = Partition(topic, 0)
factory = process_spans_strategy()
with mock.patch.object(
Expand Down Expand Up @@ -213,7 +213,7 @@ def test_rejects_large_message_size_to_kafka():
)
@mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer")
def test_option_disabled(mock_buffer):
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"])
partition = Partition(topic, 0)
mock_commit = mock.Mock()
strategy = process_spans_strategy().create_with_partitions(
Expand Down Expand Up @@ -247,7 +247,7 @@ def test_option_disabled(mock_buffer):
)
@mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer")
def test_option_project_rollout_rate_discard(mock_buffer):
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"])
partition = Partition(topic, 0)
strategy = process_spans_strategy().create_with_partitions(
commit=mock.Mock(),
Expand All @@ -272,7 +272,7 @@ def test_option_project_rollout_rate_discard(mock_buffer):
)
@mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer")
def test_option_project_rollout(mock_buffer):
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"])
partition = Partition(topic, 0)
strategy = process_spans_strategy().create_with_partitions(
commit=mock.Mock(),
Expand All @@ -297,7 +297,7 @@ def test_option_project_rollout(mock_buffer):
}
)
def test_commit_and_produce_with_multiple_partitions():
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"])
partition_1 = Partition(topic, 0)
partition_2 = Partition(topic, 1)
factory = process_spans_strategy()
Expand Down Expand Up @@ -373,7 +373,7 @@ def test_commit_and_produce_with_multiple_partitions():
)
def test_with_multiple_partitions():
redis_client = get_redis_client()
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"])
partition_1 = Partition(topic, 0)
partition_2 = Partition(topic, 1)

Expand Down Expand Up @@ -456,7 +456,7 @@ def test_with_multiple_partitions():
)
def test_with_expand_segment():
redis_client = get_redis_client()
topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"])
topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"])
partition_1 = Partition(topic, 0)
partition_2 = Partition(topic, 1)

Expand Down

0 comments on commit fabb7cf

Please sign in to comment.