Skip to content

Commit

Permalink
feat(sdk): add search client (#12754)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Mar 3, 2025
1 parent ccf4412 commit 12eb0cd
Show file tree
Hide file tree
Showing 18 changed files with 781 additions and 103 deletions.
12 changes: 10 additions & 2 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def annotate_aspects(aspects: List[dict], schema_class_file: Path) -> None:
for aspect in ASPECT_CLASSES
}}
from typing import Literal
from typing_extensions import TypedDict
class AspectBag(TypedDict, total=False):
Expand All @@ -332,6 +333,13 @@ class AspectBag(TypedDict, total=False):
KEY_ASPECTS: Dict[str, Type[_Aspect]] = {{
{f",{newline} ".join(f"'{aspect['Aspect']['keyForEntity']}': {aspect['name']}Class" for aspect in aspects if aspect["Aspect"].get("keyForEntity"))}
}}
ENTITY_TYPE_NAMES: List[str] = [
{f",{newline} ".join(f"'{aspect['Aspect']['keyForEntity']}'" for aspect in aspects if aspect["Aspect"].get("keyForEntity"))}
]
EntityTypeName = Literal[
{f",{newline} ".join(f"'{aspect['Aspect']['keyForEntity']}'" for aspect in aspects if aspect["Aspect"].get("keyForEntity"))}
]
"""
)

Expand All @@ -346,7 +354,7 @@ def write_urn_classes(key_aspects: List[dict], urn_dir: Path) -> None:
code = """
# This file contains classes corresponding to entity URNs.
from typing import ClassVar, List, Optional, Type, TYPE_CHECKING, Union
from typing import ClassVar, List, Optional, Type, TYPE_CHECKING, Union, Literal
import functools
from deprecated.sphinx import deprecated as _sphinx_deprecated
Expand Down Expand Up @@ -672,7 +680,7 @@ def generate_urn_class(entity_type: str, key_aspect: dict) -> str:
from datahub.metadata.schema_classes import {key_aspect_class}
class {class_name}(_SpecificUrn):
ENTITY_TYPE: ClassVar[str] = "{entity_type}"
ENTITY_TYPE: ClassVar[Literal["{entity_type}"]] = "{entity_type}"
_URN_PARTS: ClassVar[int] = {arg_count}
def __init__(self, {init_args}, *, _allow_coercion: bool = True) -> None:
Expand Down
26 changes: 15 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
List,
Literal,
Optional,
Sequence,
Tuple,
Type,
Union,
Expand All @@ -42,8 +43,8 @@
)
from datahub.ingestion.graph.entity_versioning import EntityVersioningAPI
from datahub.ingestion.graph.filters import (
RawSearchFilterRule,
RemovedStatusFilter,
SearchFilterRule,
generate_filter,
)
from datahub.ingestion.source.state.checkpoint import Checkpoint
Expand Down Expand Up @@ -105,7 +106,7 @@ class RelatedEntity:
via: Optional[str] = None


def _graphql_entity_type(entity_type: str) -> str:
def entity_type_to_graphql(entity_type: str) -> str:
"""Convert the entity types into GraphQL "EntityType" enum values."""

# Hard-coded special cases.
Expand Down Expand Up @@ -797,13 +798,13 @@ def _bulk_fetch_schema_info_by_filter(
container: Optional[str] = None,
status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED,
batch_size: int = 100,
extraFilters: Optional[List[SearchFilterRule]] = None,
extraFilters: Optional[List[RawSearchFilterRule]] = None,
) -> Iterable[Tuple[str, "GraphQLSchemaMetadata"]]:
"""Fetch schema info for datasets that match all of the given filters.
:return: An iterable of (urn, schema info) tuple that match the filters.
"""
types = [_graphql_entity_type("dataset")]
types = [entity_type_to_graphql("dataset")]

# Add the query default of * if no query is specified.
query = query or "*"
Expand Down Expand Up @@ -865,16 +866,16 @@ def _bulk_fetch_schema_info_by_filter(
def get_urns_by_filter(
self,
*,
entity_types: Optional[List[str]] = None,
entity_types: Optional[Sequence[str]] = None,
platform: Optional[str] = None,
platform_instance: Optional[str] = None,
env: Optional[str] = None,
query: Optional[str] = None,
container: Optional[str] = None,
status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED,
batch_size: int = 10000,
extraFilters: Optional[List[SearchFilterRule]] = None,
extra_or_filters: Optional[List[Dict[str, List[SearchFilterRule]]]] = None,
extraFilters: Optional[List[RawSearchFilterRule]] = None,
extra_or_filters: Optional[List[Dict[str, List[RawSearchFilterRule]]]] = None,
) -> Iterable[str]:
"""Fetch all urns that match all of the given filters.
Expand Down Expand Up @@ -965,8 +966,8 @@ def get_results_by_filter(
container: Optional[str] = None,
status: RemovedStatusFilter = RemovedStatusFilter.NOT_SOFT_DELETED,
batch_size: int = 10000,
extra_and_filters: Optional[List[SearchFilterRule]] = None,
extra_or_filters: Optional[List[Dict[str, List[SearchFilterRule]]]] = None,
extra_and_filters: Optional[List[RawSearchFilterRule]] = None,
extra_or_filters: Optional[List[Dict[str, List[RawSearchFilterRule]]]] = None,
extra_source_fields: Optional[List[str]] = None,
skip_cache: bool = False,
) -> Iterable[dict]:
Expand Down Expand Up @@ -1109,15 +1110,18 @@ def _scroll_across_entities(
f"Scrolling to next scrollAcrossEntities page: {scroll_id}"
)

def _get_types(self, entity_types: Optional[List[str]]) -> Optional[List[str]]:
@classmethod
def _get_types(cls, entity_types: Optional[Sequence[str]]) -> Optional[List[str]]:
types: Optional[List[str]] = None
if entity_types is not None:
if not entity_types:
raise ValueError(
"entity_types cannot be an empty list; use None for all entities"
)

types = [_graphql_entity_type(entity_type) for entity_type in entity_types]
types = [
entity_type_to_graphql(entity_type) for entity_type in entity_types
]
return types

def get_latest_pipeline_checkpoint(
Expand Down
101 changes: 64 additions & 37 deletions metadata-ingestion/src/datahub/ingestion/graph/filters.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import enum
from typing import Any, Dict, List, Optional

Expand All @@ -7,7 +8,31 @@
)
from datahub.utilities.urns.urn import guess_entity_type

SearchFilterRule = Dict[str, Any]
RawSearchFilterRule = Dict[str, Any]


@dataclasses.dataclass
class SearchFilterRule:
field: str
condition: str # TODO: convert to an enum
values: List[str]
negated: bool = False

def to_raw(self) -> RawSearchFilterRule:
return {
"field": self.field,
"condition": self.condition,
"values": self.values,
"negated": self.negated,
}

def negate(self) -> "SearchFilterRule":
return SearchFilterRule(
field=self.field,
condition=self.condition,
values=self.values,
negated=not self.negated,
)


class RemovedStatusFilter(enum.Enum):
Expand All @@ -29,9 +54,9 @@ def generate_filter(
env: Optional[str],
container: Optional[str],
status: RemovedStatusFilter,
extra_filters: Optional[List[SearchFilterRule]],
extra_or_filters: Optional[List[SearchFilterRule]] = None,
) -> List[Dict[str, List[SearchFilterRule]]]:
extra_filters: Optional[List[RawSearchFilterRule]],
extra_or_filters: Optional[List[RawSearchFilterRule]] = None,
) -> List[Dict[str, List[RawSearchFilterRule]]]:
"""
Generate a search filter based on the provided parameters.
:param platform: The platform to filter by.
Expand All @@ -43,30 +68,32 @@ def generate_filter(
:param extra_or_filters: Extra OR filters to apply. These are combined with
the AND filters using an OR at the top level.
"""
and_filters: List[SearchFilterRule] = []
and_filters: List[RawSearchFilterRule] = []

# Platform filter.
if platform:
and_filters.append(_get_platform_filter(platform))
and_filters.append(_get_platform_filter(platform).to_raw())

# Platform instance filter.
if platform_instance:
and_filters.append(_get_platform_instance_filter(platform, platform_instance))
and_filters.append(
_get_platform_instance_filter(platform, platform_instance).to_raw()
)

# Browse path v2 filter.
if container:
and_filters.append(_get_container_filter(container))
and_filters.append(_get_container_filter(container).to_raw())

# Status filter.
status_filter = _get_status_filter(status)
if status_filter:
and_filters.append(status_filter)
and_filters.append(status_filter.to_raw())

# Extra filters.
if extra_filters:
and_filters += extra_filters

or_filters: List[Dict[str, List[SearchFilterRule]]] = [{"and": and_filters}]
or_filters: List[Dict[str, List[RawSearchFilterRule]]] = [{"and": and_filters}]

# Env filter
if env:
Expand All @@ -89,7 +116,7 @@ def generate_filter(
return or_filters


def _get_env_filters(env: str) -> List[SearchFilterRule]:
def _get_env_filters(env: str) -> List[RawSearchFilterRule]:
# The env filter is a bit more tricky since it's not always stored
# in the same place in ElasticSearch.
return [
Expand Down Expand Up @@ -125,19 +152,19 @@ def _get_status_filter(status: RemovedStatusFilter) -> Optional[SearchFilterRule
# removed field is simply not present in the ElasticSearch document. Ideally this
# would be a "removed" : "false" filter, but that doesn't work. Instead, we need to
# use a negated filter.
return {
"field": "removed",
"values": ["true"],
"condition": "EQUAL",
"negated": True,
}
return SearchFilterRule(
field="removed",
values=["true"],
condition="EQUAL",
negated=True,
)

elif status == RemovedStatusFilter.ONLY_SOFT_DELETED:
return {
"field": "removed",
"values": ["true"],
"condition": "EQUAL",
}
return SearchFilterRule(
field="removed",
values=["true"],
condition="EQUAL",
)

elif status == RemovedStatusFilter.ALL:
# We don't need to add a filter for this case.
Expand All @@ -152,11 +179,11 @@ def _get_container_filter(container: str) -> SearchFilterRule:
if guess_entity_type(container) != "container":
raise ValueError(f"Invalid container urn: {container}")

return {
"field": "browsePathV2",
"values": [container],
"condition": "CONTAIN",
}
return SearchFilterRule(
field="browsePathV2",
values=[container],
condition="CONTAIN",
)


def _get_platform_instance_filter(
Expand All @@ -171,16 +198,16 @@ def _get_platform_instance_filter(
if guess_entity_type(platform_instance) != "dataPlatformInstance":
raise ValueError(f"Invalid data platform instance urn: {platform_instance}")

return {
"field": "platformInstance",
"values": [platform_instance],
"condition": "EQUAL",
}
return SearchFilterRule(
field="platformInstance",
condition="EQUAL",
values=[platform_instance],
)


def _get_platform_filter(platform: str) -> SearchFilterRule:
return {
"field": "platform.keyword",
"values": [make_data_platform_urn(platform)],
"condition": "EQUAL",
}
return SearchFilterRule(
field="platform.keyword",
condition="EQUAL",
values=[make_data_platform_urn(platform)],
)
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@
UpstreamLineageClass,
ViewPropertiesClass,
)
from datahub.sdk._entity import Entity
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset
from datahub.sdk.entity import Entity

logger = logging.getLogger(__name__)

Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset
from datahub.sdk.main_client import DataHubClient
from datahub.sdk.search_filters import Filter, FilterDsl

# We want to print out the warning if people do `from datahub.sdk import X`.
# But we don't want to print out warnings if they're doing a more direct
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/sdk/_all_entities.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Dict, List, Type

from datahub.sdk._entity import Entity
from datahub.sdk.container import Container
from datahub.sdk.dataset import Dataset
from datahub.sdk.entity import Entity

# TODO: Is there a better way to declare this?
ENTITY_CLASSES_LIST: List[Type[Entity]] = [
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/sdk/_shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
TagUrn,
Urn,
)
from datahub.sdk._entity import Entity
from datahub.sdk._utils import add_list_unique, remove_list_unique
from datahub.sdk.entity import Entity
from datahub.utilities.urns.error import InvalidUrnError

if TYPE_CHECKING:
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/sdk/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
ContainerUrn,
Urn,
)
from datahub.sdk._entity import Entity, ExtraAspectsType
from datahub.sdk._shared import (
DomainInputType,
HasContainer,
Expand All @@ -33,6 +32,7 @@
make_time_stamp,
parse_time_stamp,
)
from datahub.sdk.entity import Entity, ExtraAspectsType
from datahub.utilities.sentinels import Auto, auto


Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/sdk/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from datahub.ingestion.source.sql.sql_types import resolve_sql_type
from datahub.metadata.urns import DatasetUrn, SchemaFieldUrn, Urn
from datahub.sdk._attribution import is_ingestion_attribution
from datahub.sdk._entity import Entity, ExtraAspectsType
from datahub.sdk._shared import (
DatasetUrnOrStr,
DomainInputType,
Expand All @@ -39,6 +38,7 @@
parse_time_stamp,
)
from datahub.sdk._utils import add_list_unique, remove_list_unique
from datahub.sdk.entity import Entity, ExtraAspectsType
from datahub.utilities.sentinels import Unset, unset

SchemaFieldInputType: TypeAlias = Union[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def _init_from_graph(self, current_aspects: models.AspectBag) -> Self:
@abc.abstractmethod
def get_urn_type(cls) -> Type[_SpecificUrn]: ...

@classmethod
def entity_type_name(cls) -> str:
return cls.get_urn_type().ENTITY_TYPE

@property
def urn(self) -> _SpecificUrn:
return self._urn
Expand Down
Loading

0 comments on commit 12eb0cd

Please sign in to comment.