Skip to content

Commit

Permalink
feat(ingest): add dataset deny pattern for superset and preset
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinkarchacryl committed Feb 26, 2025
1 parent 7cfee8b commit e299418
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
34 changes: 32 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/superset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, Iterable, List, Optional
Expand Down Expand Up @@ -74,6 +75,7 @@
DatasetPropertiesClass,
)
from datahub.utilities import config_clean
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.registries.domain_registry import DomainRegistry

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -101,6 +103,14 @@
platform_without_databases = ["druid"]


@dataclass
class SupersetSourceReport(StaleEntityRemovalSourceReport):
filtered: LossyList[str] = field(default_factory=LossyList)

def report_dropped(self, name: str) -> None:
self.filtered.append(name)


class SupersetDataset(BaseModel):
id: int
table_name: str
Expand Down Expand Up @@ -136,6 +146,10 @@ class SupersetConfig(
default=dict(),
description="regex patterns for tables to filter to assign domain_key. ",
)
database_pattern: Optional[AllowDenyPattern] = Field(
default=None,
description="Regex patterns for databases to filter in ingestion.",
)
username: Optional[str] = Field(default=None, description="Superset username.")
password: Optional[str] = Field(default=None, description="Superset password.")
# Configuration for stateful ingestion
Expand Down Expand Up @@ -216,7 +230,7 @@ class SupersetSource(StatefulIngestionSourceBase):
"""

config: SupersetConfig
report: StaleEntityRemovalSourceReport
report: SupersetSourceReport
platform = "superset"

def __hash__(self):
Expand All @@ -225,7 +239,7 @@ def __hash__(self):
def __init__(self, ctx: PipelineContext, config: SupersetConfig):
super().__init__(config, ctx)
self.config = config
self.report = StaleEntityRemovalSourceReport()
self.report = SupersetSourceReport()
if self.config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[domain_id for domain_id in self.config.domain],
Expand Down Expand Up @@ -619,6 +633,22 @@ def construct_dataset_from_dataset_data(
def emit_dataset_mces(self) -> Iterable[MetadataWorkUnit]:
for dataset_data in self.paginate_entity_api_results("dataset", PAGE_SIZE):
try:
dataset_id = dataset_data.get("id")
dataset_response = self.get_dataset_info(dataset_id)

database_name = (
dataset_response.get("result", {})
.get("database", {})
.get("database_name", "")
)

if (
self.config.database_pattern
and not self.config.database_pattern.allowed(database_name)
):
self.report.report_dropped(database_name)
continue

dataset_snapshot = self.construct_dataset_from_dataset_data(
dataset_data
)
Expand Down
21 changes: 21 additions & 0 deletions metadata-ingestion/tests/unit/test_preset_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,24 @@ def test_preset_config_parsing():

# Test that regular Superset fields are still parsed
assert config.connect_uri == "https://preset.io"


def test_database_pattern():
db_pattern = "test_database1"

config = PresetConfig.parse_obj(
{
"database_pattern": {
"allow": [".*"],
"deny": [db_pattern],
"ignoreCase": False,
}
}
)

assert config.database_pattern.allow == [".*"]
assert config.database_pattern.deny == [db_pattern]
assert config.database_pattern.ignoreCase is False

assert config.database_pattern.allowed("test_db2") is True
assert config.database_pattern.allowed(db_pattern) is False
21 changes: 21 additions & 0 deletions metadata-ingestion/tests/unit/test_superset_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,24 @@ def test_set_display_uri():

assert config.connect_uri == "http://localhost:8088"
assert config.display_uri == display_uri


def test_database_pattern():
db_pattern = "test_database1"

config = SupersetConfig.parse_obj(
{
"database_pattern": {
"allow": [".*"],
"deny": [db_pattern],
"ignoreCase": False,
}
}
)

assert config.database_pattern.allow == [".*"]
assert config.database_pattern.deny == [db_pattern]
assert config.database_pattern.ignoreCase is False

assert config.database_pattern.allowed("test_db2") is True
assert config.database_pattern.allowed(db_pattern) is False

0 comments on commit e299418

Please sign in to comment.