diff --git a/metadata-ingestion/src/datahub/ingestion/source/superset.py b/metadata-ingestion/src/datahub/ingestion/source/superset.py index cd4c7c685b8ae..825ce500ba78c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/superset.py +++ b/metadata-ingestion/src/datahub/ingestion/source/superset.py @@ -307,22 +307,28 @@ def paginate_entity_api_results(self, entity_type, page_size=100): total_items = page_size while current_page * page_size < total_items: - response = self.session.get( - f"{self.config.connect_uri}/api/v1/{entity_type}", - params={"q": f"(page:{current_page},page_size:{page_size})"}, - ) + try: + response = self.session.get( + f"{self.config.connect_uri}/api/v1/{entity_type}", + params={"q": f"(page:{current_page},page_size:{page_size})"}, + timeout=10, + ) - if response.status_code != 200: - logger.warning(f"Failed to get {entity_type} data: {response.text}") + if response.status_code != 200: + logger.warning(f"Failed to get {entity_type} data: {response.text}") - payload = response.json() - # Update total_items with the actual count from the response - total_items = payload.get("count", total_items) - # Yield each item in the result, this gets passed into the construct functions - for item in payload.get("result", []): - yield item + payload = response.json() + # Update total_items with the actual count from the response + total_items = payload.get("count", total_items) + # Yield each item in the result, this gets passed into the construct functions + for item in payload.get("result", []): + yield item - current_page += 1 + current_page += 1 + except requests.exceptions.Timeout: + logger.info( + f"Request timed out while fetching {entity_type} data (page {current_page})." + ) def parse_owner_info(self) -> Dict[str, Any]: entity_types = ["dataset", "dashboard", "chart"] @@ -345,13 +351,19 @@ def build_owner_urn(self, data: Dict[str, Any]) -> List[str]: @lru_cache(maxsize=None) def get_dataset_info(self, dataset_id: int) -> dict: - dataset_response = self.session.get( - f"{self.config.connect_uri}/api/v1/dataset/{dataset_id}", - ) - if dataset_response.status_code != 200: - logger.warning(f"Failed to get dataset info: {dataset_response.text}") - dataset_response.raise_for_status() - return dataset_response.json() + try: + dataset_response = self.session.get( + f"{self.config.connect_uri}/api/v1/dataset/{dataset_id}", + timeout=10, + ) + if dataset_response.status_code != 200: + logger.warning(f"Failed to get dataset info: {dataset_response.text}") + dataset_response.raise_for_status() + return dataset_response.json() + except requests.exceptions.Timeout: + logger.info( + f"Request timed out while fetching dataset info for {dataset_id}." + ) def get_datasource_urn_from_id( self, dataset_response: dict, platform_instance: str @@ -392,7 +404,7 @@ def construct_dashboard_from_api_data( ) -> DashboardSnapshot: dashboard_urn = make_dashboard_urn( platform=self.platform, - name=dashboard_data["id"], + name=str(dashboard_data["id"]), platform_instance=self.config.platform_instance, ) dashboard_snapshot = DashboardSnapshot( @@ -424,7 +436,7 @@ def construct_dashboard_from_api_data( chart_urns.append( make_chart_urn( platform=self.platform, - name=value.get("meta", {}).get("chartId", "unknown"), + name=str(value.get("meta", {}).get("chartId", "unknown")), platform_instance=self.config.platform_instance, ) ) @@ -507,7 +519,7 @@ def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]: def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot: chart_urn = make_chart_urn( platform=self.platform, - name=chart_data["id"], + name=str(chart_data["id"]), platform_instance=self.config.platform_instance, ) chart_snapshot = ChartSnapshot( @@ -609,7 +621,7 @@ def emit_chart_mces(self) -> Iterable[MetadataWorkUnit]: for chart_data in self.paginate_entity_api_results("chart/", PAGE_SIZE): try: chart_id = str(chart_data.get("id")) - chart_name = chart_data.get("slice_name", "") + chart_name = str(chart_data.get("slice_name", "")) if not self.config.chart_pattern.allowed(chart_name): self.report.report_dropped(