Skip to content

Commit

Permalink
feat(ingest): allowdenypattern for dashboard, chart, dataset in super…
Browse files Browse the repository at this point in the history
…set (#12782)
  • Loading branch information
kevinkarchacryl authored Mar 4, 2025
1 parent 9e7f482 commit f327981
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 8 deletions.
11 changes: 7 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/preset.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
support_status,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.superset import SupersetConfig, SupersetSource
from datahub.ingestion.source.superset import (
SupersetConfig,
SupersetSource,
SupersetSourceReport,
)
from datahub.utilities import config_clean

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -76,15 +79,15 @@ class PresetSource(SupersetSource):
"""

config: PresetConfig
report: StaleEntityRemovalSourceReport
report: SupersetSourceReport
platform = "preset"

def __init__(self, ctx: PipelineContext, config: PresetConfig):
logger.info(f"ctx is {ctx}")

super().__init__(ctx, config)
self.config = config
self.report = StaleEntityRemovalSourceReport()
self.report = SupersetSourceReport()
self.platform = "preset"

def login(self):
Expand Down
73 changes: 69 additions & 4 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, Iterable, List, Optional
Expand Down Expand Up @@ -80,6 +81,7 @@
UpstreamLineageClass,
)
from datahub.utilities import config_clean
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.registries.domain_registry import DomainRegistry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -107,6 +109,14 @@
platform_without_databases = ["druid"]


@dataclass
class SupersetSourceReport(StaleEntityRemovalSourceReport):
filtered: LossyList[str] = field(default_factory=LossyList)

def report_dropped(self, name: str) -> None:
self.filtered.append(name)


class SupersetDataset(BaseModel):
id: int
table_name: str
Expand Down Expand Up @@ -142,6 +152,18 @@ class SupersetConfig(
default=dict(),
description="regex patterns for tables to filter to assign domain_key. ",
)
dataset_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for dataset to filter in ingestion.",
)
chart_pattern: AllowDenyPattern = Field(
AllowDenyPattern.allow_all(),
description="Patterns for selecting chart names that are to be included",
)
dashboard_pattern: AllowDenyPattern = Field(
AllowDenyPattern.allow_all(),
description="Patterns for selecting dashboard names that are to be included",
)
username: Optional[str] = Field(default=None, description="Superset username.")
password: Optional[str] = Field(default=None, description="Superset password.")
# Configuration for stateful ingestion
Expand Down Expand Up @@ -222,7 +244,7 @@ class SupersetSource(StatefulIngestionSourceBase):
"""

config: SupersetConfig
report: StaleEntityRemovalSourceReport
report: SupersetSourceReport
platform = "superset"

def __hash__(self):
Expand All @@ -231,7 +253,7 @@ def __hash__(self):
def __init__(self, ctx: PipelineContext, config: SupersetConfig):
super().__init__(config, ctx)
self.config = config
self.report = StaleEntityRemovalSourceReport()
self.report = SupersetSourceReport()
if self.config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[domain_id for domain_id in self.config.domain],
Expand Down Expand Up @@ -449,6 +471,15 @@ def construct_dashboard_from_api_data(
def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
for dashboard_data in self.paginate_entity_api_results("dashboard/", PAGE_SIZE):
try:
dashboard_id = str(dashboard_data.get("id"))
dashboard_title = dashboard_data.get("dashboard_title", "")

if not self.config.dashboard_pattern.allowed(dashboard_title):
self.report.report_dropped(
f"Dashboard '{dashboard_title}' (id: {dashboard_id}) filtered by dashboard_pattern"
)
continue

dashboard_snapshot = self.construct_dashboard_from_api_data(
dashboard_data
)
Expand All @@ -461,7 +492,7 @@ def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
mce = MetadataChangeEvent(proposedSnapshot=dashboard_snapshot)
yield MetadataWorkUnit(id=dashboard_snapshot.urn, mce=mce)
yield from self._get_domain_wu(
title=dashboard_data.get("dashboard_title", ""),
title=dashboard_title,
entity_urn=dashboard_snapshot.urn,
)

Expand Down Expand Up @@ -569,12 +600,37 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot:
def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]:
for chart_data in self.paginate_entity_api_results("chart/", PAGE_SIZE):
try:
chart_id = str(chart_data.get("id"))
chart_name = chart_data.get("slice_name", "")

if not self.config.chart_pattern.allowed(chart_name):
self.report.report_dropped(
f"Chart '{chart_name}' (id: {chart_id}) filtered by chart_pattern"
)
continue

# Emit a warning if charts use data from a dataset that will be filtered out
if self.config.dataset_pattern != AllowDenyPattern.allow_all():
datasource_id = chart_data.get("datasource_id")
if datasource_id:
dataset_response = self.get_dataset_info(datasource_id)
dataset_name = dataset_response.get("result", {}).get(
"table_name", ""
)

if dataset_name and not self.config.dataset_pattern.allowed(
dataset_name
):
self.report.warning(
f"Chart '{chart_name}' (id: {chart_id}) uses dataset '{dataset_name}' which is filtered by dataset_pattern"
)

chart_snapshot = self.construct_chart_from_chart_data(chart_data)

mce = MetadataChangeEvent(proposedSnapshot=chart_snapshot)
except Exception as e:
self.report.warning(
f"Failed to construct chart snapshot. Chart name: {chart_data.get('table_name')}. Error: \n{e}"
f"Failed to construct chart snapshot. Chart name: {chart_name}. Error: \n{e}"
)
continue
# Emit the chart
Expand Down Expand Up @@ -716,6 +772,15 @@ def construct_dataset_from_dataset_data(
def emit_dataset_mces(self) -> Iterable[MetadataWorkUnit]:
for dataset_data in self.paginate_entity_api_results("dataset/", PAGE_SIZE):
try:
dataset_name = dataset_data.get("table_name", "")

# Check if dataset should be filtered by dataset name
if not self.config.dataset_pattern.allowed(dataset_name):
self.report.report_dropped(
f"Dataset '{dataset_name}' filtered by dataset_pattern"
)
continue

dataset_snapshot = self.construct_dataset_from_dataset_data(
dataset_data
)
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/tests/unit/test_preset_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.source.preset import PresetConfig


Expand All @@ -10,6 +11,9 @@ def test_default_values():
assert config.env == "PROD"
assert config.api_key is None
assert config.api_secret is None
assert config.dataset_pattern == AllowDenyPattern.allow_all()
assert config.chart_pattern == AllowDenyPattern.allow_all()
assert config.dashboard_pattern == AllowDenyPattern.allow_all()


def test_set_display_uri():
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/tests/unit/test_superset_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datahub.configuration.common import AllowDenyPattern
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.superset import SupersetConfig, SupersetSource

Expand All @@ -11,6 +12,9 @@ def test_default_values():
assert config.env == "PROD"
assert config.username is None
assert config.password is None
assert config.dataset_pattern == AllowDenyPattern.allow_all()
assert config.chart_pattern == AllowDenyPattern.allow_all()
assert config.dashboard_pattern == AllowDenyPattern.allow_all()


def test_set_display_uri():
Expand Down

0 comments on commit f327981

Please sign in to comment.