Skip to content

Commit

Permalink
Merge branch 'master' into peter/superset-iterating-over-int-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
PeteMango authored Mar 6, 2025
2 parents 75d4d97 + a646185 commit b53af04
Show file tree
Hide file tree
Showing 27 changed files with 1,311 additions and 255 deletions.
15 changes: 8 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ buildscript {
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.25.0'
ext.logbackClassicJava8 = '1.2.12'
ext.awsSdk2Version = '2.30.33'

ext.docker_registry = 'acryldata'

Expand Down Expand Up @@ -120,12 +121,12 @@ project.ext.externalDependency = [
'assertJ': 'org.assertj:assertj-core:3.11.1',
'avro': 'org.apache.avro:avro:1.11.4',
'avroCompiler': 'org.apache.avro:avro-compiler:1.11.4',
'awsGlueSchemaRegistrySerde': 'software.amazon.glue:schema-registry-serde:1.1.17',
'awsMskIamAuth': 'software.amazon.msk:aws-msk-iam-auth:2.0.3',
'awsS3': 'software.amazon.awssdk:s3:2.26.21',
'awsSecretsManagerJdbc': 'com.amazonaws.secretsmanager:aws-secretsmanager-jdbc:1.0.13',
'awsPostgresIamAuth': 'software.amazon.jdbc:aws-advanced-jdbc-wrapper:1.0.2',
'awsRds':'software.amazon.awssdk:rds:2.18.24',
'awsGlueSchemaRegistrySerde': 'software.amazon.glue:schema-registry-serde:1.1.23',
'awsMskIamAuth': 'software.amazon.msk:aws-msk-iam-auth:2.3.0',
'awsS3': "software.amazon.awssdk:s3:$awsSdk2Version",
'awsSecretsManagerJdbc': 'com.amazonaws.secretsmanager:aws-secretsmanager-jdbc:1.0.15',
'awsPostgresIamAuth': 'software.amazon.jdbc:aws-advanced-jdbc-wrapper:2.5.4',
'awsRds':"software.amazon.awssdk:rds:$awsSdk2Version",
'cacheApi': 'javax.cache:cache-api:1.1.0',
'commonsCli': 'commons-cli:commons-cli:1.5.0',
'commonsIo': 'commons-io:commons-io:2.17.0',
Expand Down Expand Up @@ -240,7 +241,7 @@ project.ext.externalDependency = [
'playFilters': "com.typesafe.play:filters-helpers_$playScalaVersion:$playVersion",
'pac4j': 'org.pac4j:pac4j-oidc:6.0.6',
'playPac4j': "org.pac4j:play-pac4j_$playScalaVersion:12.0.0-PLAY2.8",
'postgresql': 'org.postgresql:postgresql:42.7.4',
'postgresql': 'org.postgresql:postgresql:42.7.5',
'protobuf': 'com.google.protobuf:protobuf-java:3.25.5',
'grpcProtobuf': 'io.grpc:grpc-protobuf:1.53.0',
'rangerCommons': 'org.apache.ranger:ranger-plugins-common:2.3.0',
Expand Down
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

- #12716: Fix the `platform_instance` being added twice to the URN. If you want to have the previous behavior back, you need to add your platform_instance twice (i.e. `plat.plat`).

- #12797: Previously endpoints when used in ASYNC mode would not validate URNs, entity & aspect names immediately. Starting with this release, even in ASYNC mode, these requests will be returned with http code 400.


### Known Issues

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.linkedin.metadata.aspect.batch;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.patch.template.AspectTemplateEngine;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import java.util.Collections;
Expand Down Expand Up @@ -82,4 +84,6 @@ static boolean supportsPatch(AspectSpec aspectSpec) {
}
return true;
}

default void validate(Urn urn, String aspectName, EntityRegistry entityRegistry) {}
}
91 changes: 72 additions & 19 deletions metadata-ingestion/src/datahub/cli/check_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import pprint
import shutil
import tempfile
from typing import Dict, List, Optional, Union
from datetime import datetime
from typing import Any, Dict, List, Optional, Union

import click

Expand All @@ -20,7 +21,10 @@
from datahub.ingestion.source.source_registry import source_registry
from datahub.ingestion.transformer.transform_registry import transform_registry
from datahub.telemetry import telemetry
from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedList
from datahub.utilities.file_backed_collections import (
ConnectionWrapper,
FileBackedDict,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -391,29 +395,78 @@ def test_path_spec(config: str, input: str, path_spec_key: str) -> None:
raise e


def _jsonify(data: Any) -> Any:
if dataclasses.is_dataclass(data):
# dataclasses.asdict() is recursive. We're doing the recursion
# manually here via _jsonify calls, so we can't use
# dataclasses.asdict() here.
return {
f.name: _jsonify(getattr(data, f.name)) for f in dataclasses.fields(data)
}
elif isinstance(data, list):
return [_jsonify(item) for item in data]
elif isinstance(data, dict):
return {_jsonify(k): _jsonify(v) for k, v in data.items()}
elif isinstance(data, datetime):
return data.isoformat()
else:
return data


@check.command()
@click.argument("query-log-file", type=click.Path(exists=True, dir_okay=False))
@click.option("--output", type=click.Path())
def extract_sql_agg_log(query_log_file: str, output: Optional[str]) -> None:
@click.argument("db-file", type=click.Path(exists=True, dir_okay=False))
def extract_sql_agg_log(db_file: str) -> None:
"""Convert a sqlite db generated by the SqlParsingAggregator into a JSON."""

from datahub.sql_parsing.sql_parsing_aggregator import LoggedQuery
if pathlib.Path(db_file).suffix != ".db":
raise click.UsageError("DB file must be a sqlite db")

output_dir = pathlib.Path(db_file).with_suffix("")
output_dir.mkdir(exist_ok=True)

shared_connection = ConnectionWrapper(pathlib.Path(db_file))

tables: List[str] = [
row[0]
for row in shared_connection.execute(
"""\
SELECT
name
FROM
sqlite_schema
WHERE
type ='table' AND
name NOT LIKE 'sqlite_%';
""",
parameters={},
)
]
logger.info(f"Extracting {len(tables)} tables from {db_file}: {tables}")

for table in tables:
table_output_path = output_dir / f"{table}.json"
if table_output_path.exists():
logger.info(f"Skipping {table_output_path} because it already exists")
continue

assert dataclasses.is_dataclass(LoggedQuery)
# Some of the tables might actually be FileBackedList. Because
# the list is built on top of the FileBackedDict, we don't
# need to distinguish between the two cases.

shared_connection = ConnectionWrapper(pathlib.Path(query_log_file))
query_log = FileBackedList[LoggedQuery](
shared_connection=shared_connection, tablename="stored_queries"
)
logger.info(f"Extracting {len(query_log)} queries from {query_log_file}")
queries = [dataclasses.asdict(query) for query in query_log]
table_data: FileBackedDict[Any] = FileBackedDict(
shared_connection=shared_connection, tablename=table
)

if output:
with open(output, "w") as f:
json.dump(queries, f, indent=2, default=str)
logger.info(f"Extracted {len(queries)} queries to {output}")
else:
click.echo(json.dumps(queries, indent=2))
data = {}
with click.progressbar(
table_data.items(), length=len(table_data), label=f"Extracting {table}"
) as items:
for k, v in items:
data[k] = _jsonify(v)

with open(table_output_path, "w") as f:
json.dump(data, f, indent=2, default=str)
logger.info(f"Extracted {len(data)} entries to {table_output_path}")


@check.command()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from humanfriendly import format_timespan
from pydantic import Field, validator
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.rest import RestCatalog
from requests.adapters import HTTPAdapter
from sortedcontainers import SortedList
from urllib3.util import Retry

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
Expand All @@ -26,6 +29,23 @@

logger = logging.getLogger(__name__)

DEFAULT_REST_TIMEOUT = 120
DEFAULT_REST_RETRY_POLICY = {"total": 3, "backoff_factor": 0.1}


class TimeoutHTTPAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
if "timeout" in kwargs:
self.timeout = kwargs["timeout"]
del kwargs["timeout"]
super().__init__(*args, **kwargs)

def send(self, request, **kwargs):
timeout = kwargs.get("timeout")
if timeout is None and hasattr(self, "timeout"):
kwargs["timeout"] = self.timeout
return super().send(request, **kwargs)


class IcebergProfilingConfig(ConfigModel):
enabled: bool = Field(
Expand Down Expand Up @@ -146,7 +166,26 @@ def get_catalog(self) -> Catalog:
logger.debug(
"Initializing the catalog %s with config: %s", catalog_name, catalog_config
)
return load_catalog(name=catalog_name, **catalog_config)
catalog = load_catalog(name=catalog_name, **catalog_config)
if isinstance(catalog, RestCatalog):
logger.debug(
"Recognized REST catalog type being configured, attempting to configure HTTP Adapter for the session"
)
retry_policy: Dict[str, Any] = DEFAULT_REST_RETRY_POLICY.copy()
retry_policy.update(catalog_config.get("connection", {}).get("retry", {}))
retries = Retry(**retry_policy)
logger.debug(f"Retry policy to be set: {retry_policy}")
timeout = catalog_config.get("connection", {}).get(
"timeout", DEFAULT_REST_TIMEOUT
)
logger.debug(f"Timeout to be set: {timeout}")
catalog._session.mount(
"http://", TimeoutHTTPAdapter(timeout=timeout, max_retries=retries)
)
catalog._session.mount(
"https://", TimeoutHTTPAdapter(timeout=timeout, max_retries=retries)
)
return catalog


class TopTableTimings:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[
{
{
"0": {
"query": "create table foo as select a, b from bar",
"session_id": null,
"timestamp": null,
"user": null,
"default_db": "dev",
"default_schema": "public"
}
]
}
37 changes: 23 additions & 14 deletions metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,13 @@ def _ts(ts: int) -> datetime:
return datetime.fromtimestamp(ts, tz=timezone.utc)


@freeze_time(FROZEN_TIME)
def test_basic_lineage(pytestconfig: pytest.Config, tmp_path: pathlib.Path) -> None:
def make_basic_aggregator(store: bool = False) -> SqlParsingAggregator:
aggregator = SqlParsingAggregator(
platform="redshift",
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
query_log=QueryLogSetting.STORE_ALL,
query_log=QueryLogSetting.STORE_ALL if store else QueryLogSetting.DISABLED,
)

aggregator.add_observed_query(
Expand All @@ -59,26 +58,36 @@ def test_basic_lineage(pytestconfig: pytest.Config, tmp_path: pathlib.Path) -> N
)
)

return aggregator


@freeze_time(FROZEN_TIME)
def test_basic_lineage(pytestconfig: pytest.Config, tmp_path: pathlib.Path) -> None:
aggregator = make_basic_aggregator()
mcps = list(aggregator.gen_metadata())

check_goldens_stream(
outputs=mcps,
golden_path=RESOURCE_DIR / "test_basic_lineage.json",
)

# This test also validates the query log storage functionality.

@freeze_time(FROZEN_TIME)
def test_aggregator_dump(pytestconfig: pytest.Config, tmp_path: pathlib.Path) -> None:
# Validates the query log storage + extraction functionality.
aggregator = make_basic_aggregator(store=True)
aggregator.close()

query_log_db = aggregator.report.query_log_path
query_log_json = tmp_path / "query_log.json"
run_datahub_cmd(
[
"check",
"extract-sql-agg-log",
str(query_log_db),
"--output",
str(query_log_json),
]
)
assert query_log_db is not None

run_datahub_cmd(["check", "extract-sql-agg-log", query_log_db])

output_json_dir = pathlib.Path(query_log_db).with_suffix("")
assert (
len(list(output_json_dir.glob("*.json"))) > 5
) # 5 is arbitrary, but should have at least a couple tables
query_log_json = output_json_dir / "stored_queries.json"
mce_helpers.check_golden_file(
pytestconfig, query_log_json, RESOURCE_DIR / "test_basic_lineage_query_log.json"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.linkedin.metadata.aspect.plugins.hooks.MutationHook;
import com.linkedin.metadata.aspect.plugins.validation.ValidationExceptionCollection;
import com.linkedin.metadata.entity.validation.ValidationException;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.util.Pair;
import java.util.ArrayList;
Expand Down Expand Up @@ -153,10 +152,11 @@ private Stream<? extends BatchItem> proposedItemsToChangeItemStream(List<MCPItem

private static BatchItem patchDiscriminator(MCPItem mcpItem, AspectRetriever aspectRetriever) {
if (ChangeType.PATCH.equals(mcpItem.getChangeType())) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
aspectRetriever.getEntityRegistry());
return PatchItemImpl.builder()
.build(
mcpItem.getMetadataChangeProposal(),
mcpItem.getAuditStamp(),
aspectRetriever.getEntityRegistry());
}
return ChangeItemImpl.builder()
.build(mcpItem.getMetadataChangeProposal(), mcpItem.getAuditStamp(), aspectRetriever);
Expand Down Expand Up @@ -195,22 +195,18 @@ public AspectsBatchImplBuilder mcps(
mcp -> {
try {
if (alternateMCPValidation) {
EntitySpec entitySpec =
retrieverContext
.getAspectRetriever()
.getEntityRegistry()
.getEntitySpec(mcp.getEntityType());
return ProposedItem.builder()
.metadataChangeProposal(mcp)
.entitySpec(entitySpec)
.auditStamp(auditStamp)
.build();
.build(
mcp,
auditStamp,
retrieverContext.getAspectRetriever().getEntityRegistry());
}
if (mcp.getChangeType().equals(ChangeType.PATCH)) {
return PatchItemImpl.PatchItemImplBuilder.build(
mcp,
auditStamp,
retrieverContext.getAspectRetriever().getEntityRegistry());
return PatchItemImpl.builder()
.build(
mcp,
auditStamp,
retrieverContext.getAspectRetriever().getEntityRegistry());
} else {
return ChangeItemImpl.builder()
.build(mcp, auditStamp, retrieverContext.getAspectRetriever());
Expand Down
Loading

0 comments on commit b53af04

Please sign in to comment.