Skip to content

Commit

Permalink
Initial approach to custom http adapters
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal committed Mar 5, 2025
1 parent be42e11 commit 883d3b2
Showing 1 changed file with 40 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from humanfriendly import format_timespan
from pydantic import Field, validator
from pyiceberg.catalog import Catalog, load_catalog
from pyiceberg.catalog.rest import RestCatalog
from requests.adapters import HTTPAdapter
from sortedcontainers import SortedList
from urllib3.util import Retry

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import DatasetSourceConfigMixin
Expand All @@ -26,6 +29,23 @@

logger = logging.getLogger(__name__)

DEFAULT_REST_TIMEOUT = 120
DEFAULT_REST_RETRY_POLICY = {"total": 3, "backoff_factor": 0.1}


class TimeoutHTTPAdapter(HTTPAdapter):
def __init__(self, *args, **kwargs):
if "timeout" in kwargs:
self.timeout = kwargs["timeout"]
del kwargs["timeout"]
super().__init__(*args, **kwargs)

def send(self, request, **kwargs):
timeout = kwargs.get("timeout")
if timeout is None and hasattr(self, "timeout"):
kwargs["timeout"] = self.timeout
return super().send(request, **kwargs)


class IcebergProfilingConfig(ConfigModel):
enabled: bool = Field(
Expand Down Expand Up @@ -146,7 +166,26 @@ def get_catalog(self) -> Catalog:
logger.debug(
"Initializing the catalog %s with config: %s", catalog_name, catalog_config
)
return load_catalog(name=catalog_name, **catalog_config)
catalog = load_catalog(name=catalog_name, **catalog_config)
if isinstance(catalog, RestCatalog):
logger.debug(
"Recognized REST catalog type being configured, attempting to configure HTTP Adapter for the session"
)
retry_policy: Dict[str, Any] = DEFAULT_REST_RETRY_POLICY.copy()
retry_policy.update(catalog_config.get("connection", {}).get("retry", {}))
retries = Retry(**retry_policy)
logger.debug(f"Retry policy to be set: {retry_policy}")
timeout = catalog_config.get("connection", {}).get(
"timeout", DEFAULT_REST_TIMEOUT
)
logger.debug(f"Timeout to be set: {timeout}")
catalog._session.mount(
"http://", TimeoutHTTPAdapter(timeout=timeout, max_retries=retries)
)
catalog._session.mount(
"https://", TimeoutHTTPAdapter(timeout=timeout, max_retries=retries)
)
return catalog


class TopTableTimings:
Expand Down

0 comments on commit 883d3b2

Please sign in to comment.