Skip to content

Commit

Permalink
feat(ingestion-tracing): implement ingestion integration with tracing…
Browse files Browse the repository at this point in the history
… api
  • Loading branch information
david-leifker committed Feb 24, 2025
1 parent 7bee19c commit d2a664a
Show file tree
Hide file tree
Showing 9 changed files with 1,536 additions and 16 deletions.
191 changes: 191 additions & 0 deletions metadata-ingestion/src/datahub/emitter/openapi_emitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import json
import logging
from collections import defaultdict
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

from datahub.cli.cli_utils import ensure_has_system_metadata
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.openapi_tracer import OpenAPITrace
from datahub.emitter.response_helper import extract_trace_data
from datahub.emitter.rest_emitter import (
_DATAHUB_EMITTER_TRACE,
BATCH_INGEST_MAX_PAYLOAD_LENGTH,
INGEST_MAX_PAYLOAD_BYTES,
DataHubRestEmitter,
)
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeProposal,
)

logger = logging.getLogger(__name__)


@dataclass
class Chunk:
items: List[str]
total_bytes: int = 0

def add_item(self, item: str) -> bool:
item_bytes = len(item.encode())
if not self.items: # Always add at least one item even if over byte limit
self.items.append(item)
self.total_bytes += item_bytes
return True
self.items.append(item)
self.total_bytes += item_bytes
return True

@staticmethod
def join(chunk: "Chunk") -> str:
return "[" + ",".join(chunk.items) + "]"


class DataHubOpenApiEmitter(DataHubRestEmitter, OpenAPITrace):
def __init__(
self,
gms_server: str,
token: Optional[str] = None,
timeout_sec: Optional[float] = None,
connect_timeout_sec: Optional[float] = None,
read_timeout_sec: Optional[float] = None,
retry_status_codes: Optional[List[int]] = None,
retry_methods: Optional[List[str]] = None,
retry_max_times: Optional[int] = None,
extra_headers: Optional[Dict[str, str]] = None,
ca_certificate_path: Optional[str] = None,
client_certificate_path: Optional[str] = None,
disable_ssl_verification: bool = False,
default_trace_mode: bool = False,
):
super().__init__(
gms_server,
token,
timeout_sec,
connect_timeout_sec,
read_timeout_sec,
retry_status_codes,
retry_methods,
retry_max_times,
extra_headers,
ca_certificate_path,
client_certificate_path,
disable_ssl_verification,
default_trace_mode,
)

def _to_request(
self,
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
async_flag: Optional[bool] = None,
async_default: bool = False,
) -> Optional[Tuple[str, List[Dict[str, Any]]]]:
if mcp.aspect and mcp.aspectName:
resolved_async_flag = (
async_flag if async_flag is not None else async_default
)
url = f"{self._gms_server}/openapi/v3/entity/{mcp.entityType}?async={'true' if resolved_async_flag else 'false'}"
ensure_has_system_metadata(mcp)
aspect_value = pre_json_transform(mcp.aspect.to_obj())
return (
url,
[
{
"urn": mcp.entityUrn,
mcp.aspectName: {
"value": aspect_value,
"systemMetadata": mcp.systemMetadata.to_obj()
if mcp.systemMetadata
else None,
},
}
],
)
return None

Check warning on line 106 in metadata-ingestion/src/datahub/emitter/openapi_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/openapi_emitter.py#L106

Added line #L106 was not covered by tests

def emit_mcp(
self,
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
async_flag: Optional[bool] = None,
trace_flag: Optional[bool] = None,
trace_timeout: Optional[timedelta] = None,
) -> None:
request = self._to_request(mcp, async_flag)

if request:
response = self._emit_generic_payload(request[0], payload=request[1])

if self._should_trace(async_flag, trace_flag):
trace_data = extract_trace_data(response) if response else None
if trace_data:
self.await_status([trace_data], trace_timeout)

def emit_mcps(
self,
mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
trace_flag: Optional[bool] = None,
trace_timeout: Optional[timedelta] = None,
) -> int:
"""
1. Grouping MCPs by their entity URL
2. Breaking down large batches into smaller chunks based on both:
* Total byte size (INGEST_MAX_PAYLOAD_BYTES)
* Maximum number of items (BATCH_INGEST_MAX_PAYLOAD_LENGTH)
The Chunk class encapsulates both the items and their byte size tracking
Serializing the items only once with json.dumps(request[1]) and reusing that
The chunking logic handles edge cases (always accepting at least one item per chunk)
The joining logic is efficient with a simple string concatenation
:param mcps: metadata change proposals to transmit
:param async_flag: the mode
:return:
"""
if _DATAHUB_EMITTER_TRACE:
logger.debug(f"Attempting to emit MCP batch of size {len(mcps)}")

Check warning on line 148 in metadata-ingestion/src/datahub/emitter/openapi_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/openapi_emitter.py#L148

Added line #L148 was not covered by tests

# group by entity url
batches: Dict[str, List[Chunk]] = defaultdict(
lambda: [Chunk(items=[])]
) # Initialize with one empty Chunk

for mcp in mcps:
request = self._to_request(mcp, async_flag, async_default=True)
if request:
current_chunk = batches[request[0]][-1] # Get the last chunk
# Only serialize once
serialized_item = json.dumps(request[1][0])
item_bytes = len(serialized_item.encode())

# If adding this item would exceed max_bytes, create a new chunk
# Unless the chunk is empty (always add at least one item)
if current_chunk.items and (
current_chunk.total_bytes + item_bytes > INGEST_MAX_PAYLOAD_BYTES
or len(current_chunk.items) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH
):
new_chunk = Chunk(items=[])
batches[request[0]].append(new_chunk)
current_chunk = new_chunk

current_chunk.add_item(serialized_item)

responses = []
for url, chunks in batches.items():
for chunk in chunks:
response = super()._emit_generic(url, payload=Chunk.join(chunk))
responses.append(response)

if self._should_trace(async_flag, trace_flag, async_default=True):
trace_data = []
for response in responses:
data = extract_trace_data(response) if response else None
if data is not None:
trace_data.append(data)

if trace_data:
self.await_status(trace_data, trace_timeout)

return len(responses)
111 changes: 111 additions & 0 deletions metadata-ingestion/src/datahub/emitter/openapi_tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging
import time
from datetime import datetime, timedelta
from typing import Any, List, Optional, Protocol, TypeVar

from requests import Response

from datahub.configuration.common import (
OperationalError,
)
from datahub.emitter.response_helper import TraceData

logger = logging.getLogger(__name__)

PENDING_STATUS = "PENDING"
INITIAL_BACKOFF = 1.0 # Start with 1 second
MAX_BACKOFF = 300.0 # Cap at 5 minutes
BACKOFF_FACTOR = 2.0 # Double the wait time each attempt


class TracerProtocol(Protocol):
_gms_server: str

def _emit_generic_payload(self, url: str, payload: Any) -> Response: ...


T = TypeVar("T", bound=TracerProtocol)


class OpenAPITrace:
def await_status(
self: T,
trace_data: List[TraceData],
trace_timeout: Optional[timedelta] = timedelta(seconds=3600),
) -> None:
"""Verify the status of asynchronous write operations.
Args:
trace_data: List of trace data to verify
trace_timeout: Maximum time to wait for verification.
Raises:
OperationalError: If verification fails or times out
"""
if trace_timeout is None:
raise ValueError("trace_timeout cannot be None")

Check warning on line 46 in metadata-ingestion/src/datahub/emitter/openapi_tracer.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/openapi_tracer.py#L46

Added line #L46 was not covered by tests

try:
if not trace_data:
logger.debug("No trace data to verify")
return

start_time = datetime.now()

for trace in trace_data:
current_backoff = INITIAL_BACKOFF

while trace.data:
if datetime.now() - start_time > trace_timeout:
raise OperationalError(
f"Timeout waiting for async write completion after {trace_timeout.total_seconds()} seconds"
)

base_url = f"{self._gms_server}/openapi/v1/trace/write"
url = f"{base_url}/{trace.trace_id}?onlyIncludeErrors=false&detailed=true"

response = self._emit_generic_payload(url, payload=trace.data)
json_data = response.json()

for urn, aspects in json_data.items():
for aspect_name, aspect_status in aspects.items():
if not aspect_status["success"]:
error_msg = (
f"Unable to validate async write to DataHub GMS: "
f"Persistence failure for URN '{urn}' aspect '{aspect_name}'. "
f"Status: {aspect_status}"
)
raise OperationalError(error_msg, aspect_status)

primary_storage = aspect_status["primaryStorage"][
"writeStatus"
]
search_storage = aspect_status["searchStorage"][
"writeStatus"
]

# Remove resolved statuses
if (
primary_storage != PENDING_STATUS
and search_storage != PENDING_STATUS
):
trace.data[urn].remove(aspect_name)

# Remove urns with all statuses resolved
if not trace.data[urn]:
trace.data.pop(urn)

# Adjust backoff based on response
if trace.data:
# If we still have pending items, increase backoff
current_backoff = min(
current_backoff * BACKOFF_FACTOR, MAX_BACKOFF
)
logger.debug(
f"Waiting {current_backoff} seconds before next check"
)
time.sleep(current_backoff)

except Exception as e:
logger.error(f"Error during status verification: {str(e)}")
raise
Loading

0 comments on commit d2a664a

Please sign in to comment.