diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py index a1ac7f67ae6cf..0607ea1391bb6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py @@ -6,7 +6,10 @@ from humanfriendly import format_timespan from pydantic import Field, validator from pyiceberg.catalog import Catalog, load_catalog +from pyiceberg.catalog.rest import RestCatalog +from requests.adapters import HTTPAdapter from sortedcontainers import SortedList +from urllib3.util import Retry from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import DatasetSourceConfigMixin @@ -26,6 +29,23 @@ logger = logging.getLogger(__name__) +DEFAULT_REST_TIMEOUT = 120 +DEFAULT_REST_RETRY_POLICY = {"total": 3, "backoff_factor": 0.1} + + +class TimeoutHTTPAdapter(HTTPAdapter): + def __init__(self, *args, **kwargs): + if "timeout" in kwargs: + self.timeout = kwargs["timeout"] + del kwargs["timeout"] + super().__init__(*args, **kwargs) + + def send(self, request, **kwargs): + timeout = kwargs.get("timeout") + if timeout is None and hasattr(self, "timeout"): + kwargs["timeout"] = self.timeout + return super().send(request, **kwargs) + class IcebergProfilingConfig(ConfigModel): enabled: bool = Field( @@ -146,7 +166,26 @@ def get_catalog(self) -> Catalog: logger.debug( "Initializing the catalog %s with config: %s", catalog_name, catalog_config ) - return load_catalog(name=catalog_name, **catalog_config) + catalog = load_catalog(name=catalog_name, **catalog_config) + if isinstance(catalog, RestCatalog): + logger.debug( + "Recognized REST catalog type being configured, attempting to configure HTTP Adapter for the session" + ) + retry_policy: Dict[str, Any] = DEFAULT_REST_RETRY_POLICY.copy() + retry_policy.update(catalog_config.get("connection", {}).get("retry", {})) + retries = Retry(**retry_policy) + logger.debug(f"Retry policy to be set: {retry_policy}") + timeout = catalog_config.get("connection", {}).get( + "timeout", DEFAULT_REST_TIMEOUT + ) + logger.debug(f"Timeout to be set: {timeout}") + catalog._session.mount( + "http://", TimeoutHTTPAdapter(timeout=timeout, max_retries=retries) + ) + catalog._session.mount( + "https://", TimeoutHTTPAdapter(timeout=timeout, max_retries=retries) + ) + return catalog class TopTableTimings: