From fabb7cf7ba9617e51b710a1c3cf38e800331a311 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Mon, 3 Mar 2025 17:42:17 +0100 Subject: [PATCH] ref(spans): Add an ingest-spans topic for the spans pipeline --- src/sentry/conf/server.py | 1 + src/sentry/conf/types/kafka_definition.py | 1 + src/sentry/consumers/__init__.py | 2 +- src/sentry/spans/consumers/process/factory.py | 2 +- .../consumers/process_segments/factory.py | 29 ++++++++++++++----- .../consumers/process_segments/message.py | 2 +- .../spans/consumers/process/test_factory.py | 18 ++++++------ 7 files changed, 36 insertions(+), 19 deletions(-) diff --git a/src/sentry/conf/server.py b/src/sentry/conf/server.py index f6b185253a8e79..f3b7e0395807b6 100644 --- a/src/sentry/conf/server.py +++ b/src/sentry/conf/server.py @@ -2954,6 +2954,7 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]: "ingest-transactions": "default", "ingest-transactions-dlq": "default", "ingest-transactions-backlog": "default", + "ingest-spans": "default", "ingest-metrics": "default", "ingest-metrics-dlq": "default", "snuba-metrics": "default", diff --git a/src/sentry/conf/types/kafka_definition.py b/src/sentry/conf/types/kafka_definition.py index 2d4a1b52219e71..1825c7ebdbeedb 100644 --- a/src/sentry/conf/types/kafka_definition.py +++ b/src/sentry/conf/types/kafka_definition.py @@ -37,6 +37,7 @@ class Topic(Enum): INGEST_TRANSACTIONS = "ingest-transactions" INGEST_TRANSACTIONS_DLQ = "ingest-transactions-dlq" INGEST_TRANSACTIONS_BACKLOG = "ingest-transactions-backlog" + INGEST_SPANS = "ingest-spans" INGEST_METRICS = "ingest-metrics" INGEST_METRICS_DLQ = "ingest-metrics-dlq" SNUBA_METRICS = "snuba-metrics" diff --git a/src/sentry/consumers/__init__.py b/src/sentry/consumers/__init__.py index f3e5aa93484434..efaee89b0bdf52 100644 --- a/src/sentry/consumers/__init__.py +++ b/src/sentry/consumers/__init__.py @@ -442,7 +442,7 @@ def ingest_transactions_options() -> list[click.Option]: }, }, "process-spans": { - "topic": Topic.SNUBA_SPANS, + "topic": Topic.INGEST_SPANS, "strategy_factory": "sentry.spans.consumers.process.factory.ProcessSpansStrategyFactory", "click_options": multiprocessing_options(default_max_batch_size=100), }, diff --git a/src/sentry/spans/consumers/process/factory.py b/src/sentry/spans/consumers/process/factory.py index 2ae1701f3af752..880a11e4677794 100644 --- a/src/sentry/spans/consumers/process/factory.py +++ b/src/sentry/spans/consumers/process/factory.py @@ -38,7 +38,7 @@ logger = logging.getLogger(__name__) -SPANS_CODEC: Codec[SpanEvent] = get_topic_codec(Topic.SNUBA_SPANS) +SPANS_CODEC: Codec[SpanEvent] = get_topic_codec(Topic.INGEST_SPANS) MAX_PAYLOAD_SIZE = 10 * 1000 * 1000 # 10 MB BATCH_SIZE = 100 diff --git a/src/sentry/spans/consumers/process_segments/factory.py b/src/sentry/spans/consumers/process_segments/factory.py index 41b42622d0e723..10193b8aa77011 100644 --- a/src/sentry/spans/consumers/process_segments/factory.py +++ b/src/sentry/spans/consumers/process_segments/factory.py @@ -2,11 +2,14 @@ from collections.abc import Mapping from typing import Any +import rapidjson import sentry_sdk from arroyo.backends.kafka.consumer import KafkaPayload from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory from arroyo.processing.strategies.commit import CommitOffsets -from arroyo.types import BrokerValue, Commit, Message, Partition +from arroyo.processing.strategies.produce import Produce +from arroyo.processing.strategies.unfold import Unfold +from arroyo.types import BrokerValue, Commit, Message, Partition, Value from sentry_kafka_schemas.codecs import Codec from sentry_kafka_schemas.schema_types.buffered_segments_v1 import BufferedSegment @@ -27,10 +30,7 @@ def _deserialize_segment(value: bytes) -> Mapping[str, Any]: def process_message(message: Message[KafkaPayload]): value = message.payload.value segment = _deserialize_segment(value) - - assert segment["spans"] - - process_segment(segment["spans"]) + return process_segment(segment["spans"]) def _process_message(message: Message[KafkaPayload]): @@ -44,11 +44,18 @@ def _process_message(message: Message[KafkaPayload]): op="process", name="spans.process_segments.process_message" ): sentry_sdk.set_measurement("message_size.bytes", len(message.payload.value)) - process_message(message) + return process_message(message) except Exception: sentry_sdk.capture_exception() +def explode_segment(spans: list[dict[str, Any]]): + for span in spans: + if span is not None: + payload = rapidjson.dumps(span) + yield Value(KafkaPayload(None, payload, []), {}, None) + + class DetectPerformanceIssuesStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): def __init__( self, @@ -70,9 +77,17 @@ def create_with_partitions( commit: Commit, partitions: Mapping[Partition, int], ) -> ProcessingStrategy[KafkaPayload]: + produce_step = Produce( + producer=self.producer, + topic=self.output_topic, + next_step=CommitOffsets(commit), + ) + + unfold_step = Unfold(generator=explode_segment, next_step=produce_step) + return run_task_with_multiprocessing( function=_process_message, - next_step=CommitOffsets(commit), + next_step=unfold_step, max_batch_size=self.max_batch_size, max_batch_time=self.max_batch_time, pool=self.pool, diff --git a/src/sentry/spans/consumers/process_segments/message.py b/src/sentry/spans/consumers/process_segments/message.py index bccb92cddde1b7..7e72c7f56cfcb1 100644 --- a/src/sentry/spans/consumers/process_segments/message.py +++ b/src/sentry/spans/consumers/process_segments/message.py @@ -240,4 +240,4 @@ def process_segment(spans: list[dict[str, Any]]): _send_occurrence_to_platform(jobs, projects) _record_transaction_info(jobs, projects) - return jobs + return spans diff --git a/tests/sentry/spans/consumers/process/test_factory.py b/tests/sentry/spans/consumers/process/test_factory.py index 9a7102f01d6b28..0ca47c78b92333 100644 --- a/tests/sentry/spans/consumers/process/test_factory.py +++ b/tests/sentry/spans/consumers/process/test_factory.py @@ -101,7 +101,7 @@ def process_spans_strategy(): def test_consumer_pushes_to_redis(): redis_client = get_redis_client() - topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"]) + topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) partition = Partition(topic, 0) strategy = process_spans_strategy().create_with_partitions( commit=mock.Mock(), @@ -134,7 +134,7 @@ def test_consumer_pushes_to_redis(): } ) def test_produces_valid_segment_to_kafka(): - topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"]) + topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) partition = Partition(topic, 0) factory = process_spans_strategy() with mock.patch.object( @@ -175,7 +175,7 @@ def test_produces_valid_segment_to_kafka(): } ) def test_rejects_large_message_size_to_kafka(): - topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"]) + topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) partition = Partition(topic, 0) factory = process_spans_strategy() with mock.patch.object( @@ -213,7 +213,7 @@ def test_rejects_large_message_size_to_kafka(): ) @mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer") def test_option_disabled(mock_buffer): - topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"]) + topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) partition = Partition(topic, 0) mock_commit = mock.Mock() strategy = process_spans_strategy().create_with_partitions( @@ -247,7 +247,7 @@ def test_option_disabled(mock_buffer): ) @mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer") def test_option_project_rollout_rate_discard(mock_buffer): - topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"]) + topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) partition = Partition(topic, 0) strategy = process_spans_strategy().create_with_partitions( commit=mock.Mock(), @@ -272,7 +272,7 @@ def test_option_project_rollout_rate_discard(mock_buffer): ) @mock.patch("sentry.spans.consumers.process.factory.RedisSpansBuffer") def test_option_project_rollout(mock_buffer): - topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"]) + topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) partition = Partition(topic, 0) strategy = process_spans_strategy().create_with_partitions( commit=mock.Mock(), @@ -297,7 +297,7 @@ def test_option_project_rollout(mock_buffer): } ) def test_commit_and_produce_with_multiple_partitions(): - topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"]) + topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) partition_1 = Partition(topic, 0) partition_2 = Partition(topic, 1) factory = process_spans_strategy() @@ -373,7 +373,7 @@ def test_commit_and_produce_with_multiple_partitions(): ) def test_with_multiple_partitions(): redis_client = get_redis_client() - topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"]) + topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) partition_1 = Partition(topic, 0) partition_2 = Partition(topic, 1) @@ -456,7 +456,7 @@ def test_with_multiple_partitions(): ) def test_with_expand_segment(): redis_client = get_redis_client() - topic = ArroyoTopic(get_topic_definition(Topic.SNUBA_SPANS)["real_topic_name"]) + topic = ArroyoTopic(get_topic_definition(Topic.INGEST_SPANS)["real_topic_name"]) partition_1 = Partition(topic, 0) partition_2 = Partition(topic, 1)