diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index a8b328f6e17739..07beae6a5719c0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -1,5 +1,6 @@ import json import logging +from dataclasses import dataclass, field from datetime import datetime from functools import lru_cache from typing import Any, Dict, Iterable, List, Optional @@ -74,6 +75,7 @@ DatasetPropertiesClass, ) from datahub.utilities import config_clean +from datahub.utilities.lossy_collections import LossyList from datahub.utilities.registries.domain_registry import DomainRegistry logger = logging.getLogger(__name__) @@ -101,6 +103,14 @@ platform_without_databases = ["druid"] +@dataclass +class SupersetSourceReport(StaleEntityRemovalSourceReport): + filtered: LossyList[str] = field(default_factory=LossyList) + + def report_dropped(self, name: str) -> None: + self.filtered.append(name) + + class SupersetDataset(BaseModel): id: int table_name: str @@ -136,6 +146,10 @@ class SupersetConfig( default=dict(), description="regex patterns for tables to filter to assign domain_key. ", ) + database_pattern: Optional[AllowDenyPattern] = Field( + default=None, + description="Regex patterns for databases to filter in ingestion.", + ) username: Optional[str] = Field(default=None, description="Superset username.") password: Optional[str] = Field(default=None, description="Superset password.") # Configuration for stateful ingestion @@ -216,7 +230,7 @@ class SupersetSource(StatefulIngestionSourceBase): """ config: SupersetConfig - report: StaleEntityRemovalSourceReport + report: SupersetSourceReport platform = "superset" def __hash__(self): @@ -225,7 +239,7 @@ def __hash__(self): def __init__(self, ctx: PipelineContext, config: SupersetConfig): super().__init__(config, ctx) self.config = config - self.report = StaleEntityRemovalSourceReport() + self.report = SupersetSourceReport() if self.config.domain: self.domain_registry = DomainRegistry( cached_domains=[domain_id for domain_id in self.config.domain], @@ -619,6 +633,22 @@ def construct_dataset_from_dataset_data( def emit_dataset_mces(self) -> Iterable[MetadataWorkUnit]: for dataset_data in self.paginate_entity_api_results("dataset", PAGE_SIZE): try: + dataset_id = dataset_data.get("id") + dataset_response = self.get_dataset_info(dataset_id) + + database_name = ( + dataset_response.get("result", {}) + .get("database", {}) + .get("database_name", "") + ) + + if ( + self.config.database_pattern + and not self.config.database_pattern.allowed(database_name) + ): + self.report.report_dropped(database_name) + continue + dataset_snapshot = self.construct_dataset_from_dataset_data( dataset_data ) diff --git a/metadata-ingestion/tests/unit/test_preset_source.py b/metadata-ingestion/tests/unit/test_preset_source.py index dc81f4c8284d50..b40c671f8d34d8 100644 --- a/metadata-ingestion/tests/unit/test_preset_source.py +++ b/metadata-ingestion/tests/unit/test_preset_source.py @@ -40,3 +40,24 @@ def test_preset_config_parsing(): # Test that regular Superset fields are still parsed assert config.connect_uri == "https://preset.io" + + +def test_database_pattern(): + db_pattern = "test_database1" + + config = PresetConfig.parse_obj( + { + "database_pattern": { + "allow": [".*"], + "deny": [db_pattern], + "ignoreCase": False, + } + } + ) + + assert config.database_pattern.allow == [".*"] + assert config.database_pattern.deny == [db_pattern] + assert config.database_pattern.ignoreCase is False + + assert config.database_pattern.allowed("test_db2") is True + assert config.database_pattern.allowed(db_pattern) is False diff --git a/metadata-ingestion/tests/unit/test_superset_source.py b/metadata-ingestion/tests/unit/test_superset_source.py index 912bfa3511421c..a7d4152ec3bd06 100644 --- a/metadata-ingestion/tests/unit/test_superset_source.py +++ b/metadata-ingestion/tests/unit/test_superset_source.py @@ -19,3 +19,24 @@ def test_set_display_uri(): assert config.connect_uri == "http://localhost:8088" assert config.display_uri == display_uri + + +def test_database_pattern(): + db_pattern = "test_database1" + + config = SupersetConfig.parse_obj( + { + "database_pattern": { + "allow": [".*"], + "deny": [db_pattern], + "ignoreCase": False, + } + } + ) + + assert config.database_pattern.allow == [".*"] + assert config.database_pattern.deny == [db_pattern] + assert config.database_pattern.ignoreCase is False + + assert config.database_pattern.allowed("test_db2") is True + assert config.database_pattern.allowed(db_pattern) is False