Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion-tracing): implement ingestion with tracing api #12714

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ class IgnorableError(MetaError):
"""An error that can be ignored."""


class TraceTimeoutError(OperationalError):
"""Failure to complete an API Trace within the timeout."""


class TraceValidationError(OperationalError):
"""Failure to complete the expected write operation."""


@runtime_checkable
class ExceptionWithProps(Protocol):
def get_telemetry_props(self) -> Dict[str, Any]: ...
Expand Down
200 changes: 200 additions & 0 deletions metadata-ingestion/src/datahub/emitter/openapi_emitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import json
import logging
from collections import defaultdict
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

from datahub.cli.cli_utils import ensure_has_system_metadata
from datahub.emitter.aspect import JSON_CONTENT_TYPE
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.openapi_tracer import OpenAPITrace
from datahub.emitter.response_helper import extract_trace_data
from datahub.emitter.rest_emitter import (
_DATAHUB_EMITTER_TRACE,
BATCH_INGEST_MAX_PAYLOAD_LENGTH,
INGEST_MAX_PAYLOAD_BYTES,
DataHubRestEmitter,
)
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeProposal,
)

logger = logging.getLogger(__name__)


@dataclass
class Chunk:
items: List[str]
total_bytes: int = 0

def add_item(self, item: str) -> bool:
item_bytes = len(item.encode())
if not self.items: # Always add at least one item even if over byte limit
self.items.append(item)
self.total_bytes += item_bytes
return True
self.items.append(item)
self.total_bytes += item_bytes
return True

@staticmethod
def join(chunk: "Chunk") -> str:
return "[" + ",".join(chunk.items) + "]"


class DataHubOpenApiEmitter(DataHubRestEmitter, OpenAPITrace):
def __init__(
self,
gms_server: str,
token: Optional[str] = None,
timeout_sec: Optional[float] = None,
connect_timeout_sec: Optional[float] = None,
read_timeout_sec: Optional[float] = None,
retry_status_codes: Optional[List[int]] = None,
retry_methods: Optional[List[str]] = None,
retry_max_times: Optional[int] = None,
extra_headers: Optional[Dict[str, str]] = None,
ca_certificate_path: Optional[str] = None,
client_certificate_path: Optional[str] = None,
disable_ssl_verification: bool = False,
default_trace_mode: bool = False,
):
super().__init__(
gms_server,
token,
timeout_sec,
connect_timeout_sec,
read_timeout_sec,
retry_status_codes,
retry_methods,
retry_max_times,
extra_headers,
ca_certificate_path,
client_certificate_path,
disable_ssl_verification,
default_trace_mode,
)

def _to_request(
self,
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
async_flag: Optional[bool] = None,
async_default: bool = False,
) -> Optional[Tuple[str, List[Dict[str, Any]]]]:
if mcp.aspect and mcp.aspectName:
resolved_async_flag = (
async_flag if async_flag is not None else async_default
)
url = f"{self._gms_server}/openapi/v3/entity/{mcp.entityType}?async={'true' if resolved_async_flag else 'false'}"
ensure_has_system_metadata(mcp)
if isinstance(mcp, MetadataChangeProposalWrapper):
aspect_value = pre_json_transform(
mcp.to_obj(simplified_structure=True)
)["aspect"]["json"]
else:
obj = mcp.aspect.to_obj()
if obj.get("value") and obj.get("contentType") == JSON_CONTENT_TYPE:
obj = json.loads(obj["value"])
aspect_value = pre_json_transform(obj)

Check warning on line 100 in metadata-ingestion/src/datahub/emitter/openapi_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/openapi_emitter.py#L97-L100

Added lines #L97 - L100 were not covered by tests
return (
url,
[
{
"urn": mcp.entityUrn,
mcp.aspectName: {
"value": aspect_value,
"systemMetadata": mcp.systemMetadata.to_obj()
if mcp.systemMetadata
else None,
},
}
],
)
return None

Check warning on line 115 in metadata-ingestion/src/datahub/emitter/openapi_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/openapi_emitter.py#L115

Added line #L115 was not covered by tests

def emit_mcp(
self,
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
async_flag: Optional[bool] = None,
trace_flag: Optional[bool] = None,
trace_timeout: Optional[timedelta] = None,
) -> None:
request = self._to_request(mcp, async_flag)

if request:
response = self._emit_generic_payload(request[0], payload=request[1])

if self._should_trace(async_flag, trace_flag):
trace_data = extract_trace_data(response) if response else None
if trace_data:
self.await_status([trace_data], trace_timeout)

def emit_mcps(
self,
mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
trace_flag: Optional[bool] = None,
trace_timeout: Optional[timedelta] = None,
) -> int:
"""
1. Grouping MCPs by their entity URL
2. Breaking down large batches into smaller chunks based on both:
* Total byte size (INGEST_MAX_PAYLOAD_BYTES)
* Maximum number of items (BATCH_INGEST_MAX_PAYLOAD_LENGTH)

The Chunk class encapsulates both the items and their byte size tracking
Serializing the items only once with json.dumps(request[1]) and reusing that
The chunking logic handles edge cases (always accepting at least one item per chunk)
The joining logic is efficient with a simple string concatenation

:param mcps: metadata change proposals to transmit
:param async_flag: the mode
:return:
"""
if _DATAHUB_EMITTER_TRACE:
logger.debug(f"Attempting to emit MCP batch of size {len(mcps)}")

Check warning on line 157 in metadata-ingestion/src/datahub/emitter/openapi_emitter.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/openapi_emitter.py#L157

Added line #L157 was not covered by tests

# group by entity url
batches: Dict[str, List[Chunk]] = defaultdict(
lambda: [Chunk(items=[])]
) # Initialize with one empty Chunk

for mcp in mcps:
request = self._to_request(mcp, async_flag, async_default=True)
if request:
current_chunk = batches[request[0]][-1] # Get the last chunk
# Only serialize once
serialized_item = json.dumps(request[1][0])
item_bytes = len(serialized_item.encode())

# If adding this item would exceed max_bytes, create a new chunk
# Unless the chunk is empty (always add at least one item)
if current_chunk.items and (
current_chunk.total_bytes + item_bytes > INGEST_MAX_PAYLOAD_BYTES
or len(current_chunk.items) >= BATCH_INGEST_MAX_PAYLOAD_LENGTH
):
new_chunk = Chunk(items=[])
batches[request[0]].append(new_chunk)
current_chunk = new_chunk

current_chunk.add_item(serialized_item)

responses = []
for url, chunks in batches.items():
for chunk in chunks:
response = super()._emit_generic(url, payload=Chunk.join(chunk))
responses.append(response)

if self._should_trace(async_flag, trace_flag, async_default=True):
trace_data = []
for response in responses:
data = extract_trace_data(response) if response else None
if data is not None:
trace_data.append(data)

if trace_data:
self.await_status(trace_data, trace_timeout)

return len(responses)
110 changes: 110 additions & 0 deletions metadata-ingestion/src/datahub/emitter/openapi_tracer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import logging
import time
from datetime import datetime, timedelta
from typing import Any, List, Optional, Protocol, TypeVar

from requests import Response

from datahub.configuration.common import TraceTimeoutError, TraceValidationError
from datahub.emitter.response_helper import TraceData

logger = logging.getLogger(__name__)

PENDING_STATUS = "PENDING"
INITIAL_BACKOFF = 1.0 # Start with 1 second
MAX_BACKOFF = 300.0 # Cap at 5 minutes
BACKOFF_FACTOR = 2.0 # Double the wait time each attempt


class TracerProtocol(Protocol):
_gms_server: str

def _emit_generic_payload(self, url: str, payload: Any) -> Response: ...


T = TypeVar("T", bound=TracerProtocol)


class OpenAPITrace:
def await_status(
self: T,
trace_data: List[TraceData],
trace_timeout: Optional[timedelta] = timedelta(seconds=3600),
) -> None:
"""Verify the status of asynchronous write operations.

Args:
trace_data: List of trace data to verify
trace_timeout: Maximum time to wait for verification.

Raises:
TraceTimeoutError: If verification fails or times out
TraceValidationError: Expected write was not completed successfully
"""
if trace_timeout is None:
raise ValueError("trace_timeout cannot be None")

Check warning on line 45 in metadata-ingestion/src/datahub/emitter/openapi_tracer.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/openapi_tracer.py#L45

Added line #L45 was not covered by tests

try:
if not trace_data:
logger.debug("No trace data to verify")
return

start_time = datetime.now()

for trace in trace_data:
current_backoff = INITIAL_BACKOFF

while trace.data:
if datetime.now() - start_time > trace_timeout:
raise TraceTimeoutError(
f"Timeout waiting for async write completion after {trace_timeout.total_seconds()} seconds"
)

base_url = f"{self._gms_server}/openapi/v1/trace/write"
url = f"{base_url}/{trace.trace_id}?onlyIncludeErrors=false&detailed=true"

response = self._emit_generic_payload(url, payload=trace.data)
json_data = response.json()

for urn, aspects in json_data.items():
for aspect_name, aspect_status in aspects.items():
if not aspect_status["success"]:
error_msg = (
f"Unable to validate async write to DataHub GMS: "
f"Persistence failure for URN '{urn}' aspect '{aspect_name}'. "
f"Status: {aspect_status}"
)
raise TraceValidationError(error_msg, aspect_status)

primary_storage = aspect_status["primaryStorage"][
"writeStatus"
]
search_storage = aspect_status["searchStorage"][
"writeStatus"
]

# Remove resolved statuses
if (
primary_storage != PENDING_STATUS
and search_storage != PENDING_STATUS
):
trace.data[urn].remove(aspect_name)

# Remove urns with all statuses resolved
if not trace.data[urn]:
trace.data.pop(urn)

# Adjust backoff based on response
if trace.data:
# If we still have pending items, increase backoff
current_backoff = min(
current_backoff * BACKOFF_FACTOR, MAX_BACKOFF
)
logger.debug(
f"Waiting {current_backoff} seconds before next check"
)
time.sleep(current_backoff)

except Exception as e:
logger.error(f"Error during status verification: {str(e)}")
raise
Loading
Loading