Skip to content

Commit

Permalink
Merge pull request #440 from atlanhq/ft-650
Browse files Browse the repository at this point in the history
FT-650 Add timeout to pyatlan client.
  • Loading branch information
ErnestoLoma authored Dec 11, 2024
2 parents 5bae30e + dc0c01a commit ae9732f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 146 deletions.
30 changes: 1 addition & 29 deletions pyatlan/client/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
DELETE_ENTITY_BY_ATTRIBUTE,
GET_ENTITY_BY_GUID,
GET_ENTITY_BY_UNIQUE_ATTRIBUTE,
GET_LINEAGE,
GET_LINEAGE_LIST,
INDEX_SEARCH,
PARTIAL_UPDATE_ENTITY_BY_ATTRIBUTE,
Expand Down Expand Up @@ -87,12 +86,7 @@
SortOrder,
)
from pyatlan.model.fields.atlan_fields import AtlanField
from pyatlan.model.lineage import (
LineageDirection,
LineageListRequest,
LineageRequest,
LineageResponse,
)
from pyatlan.model.lineage import LineageDirection, LineageListRequest
from pyatlan.model.response import AssetMutationResponse
from pyatlan.model.search import (
DSL,
Expand Down Expand Up @@ -256,28 +250,6 @@ def _get_aggregations(self, raw_json) -> Optional[Aggregations]:
pass
return aggregations

# TODO: Try adding @validate_arguments to this method once
# the issue below is fixed or when we switch to pydantic v2
# https://github.com/pydantic/pydantic/issues/2901
def get_lineage(self, lineage_request: LineageRequest) -> LineageResponse:
"""
Deprecated — this is an older, slower operation to retrieve lineage that will not receive further enhancements.
Use the get_lineage_list operation instead.
:param lineage_request: detailing the lineage query, parameters, and so on to run
:returns: the results of the lineage request
:raises AtlanError: on any API communication issue
"""
warn(
"Lineage retrieval using this method is deprecated, please use 'get_lineage_list' instead.",
DeprecationWarning,
stacklevel=2,
)
raw_json = self._client._call_api(
GET_LINEAGE, None, lineage_request, exclude_unset=False
)
return LineageResponse(**raw_json)

# TODO: Try adding @validate_arguments to this method once
# the issue below is fixed or when we switch to pydantic v2
# https://github.com/pydantic/pydantic/issues/2901
Expand Down
59 changes: 33 additions & 26 deletions pyatlan/client/atlan.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
from pyatlan.model.custom_metadata import CustomMetadataDict
from pyatlan.model.enums import AtlanConnectorType, AtlanTypeCategory, CertificateStatus
from pyatlan.model.group import AtlanGroup, CreateGroupResponse, GroupResponse
from pyatlan.model.lineage import LineageListRequest, LineageRequest, LineageResponse
from pyatlan.model.lineage import LineageListRequest
from pyatlan.model.query import ParsedQuery, QueryParserRequest
from pyatlan.model.response import AssetMutationResponse
from pyatlan.model.role import RoleResponse
Expand Down Expand Up @@ -107,7 +107,7 @@ def get_adapter() -> logging.LoggerAdapter:
LOGGER = get_adapter()

DEFAULT_RETRY = Retry(
total=3,
total=5,
backoff_factor=1,
status_forcelist=[403, 429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "DELETE"],
Expand All @@ -123,11 +123,7 @@ def log_response(response, *args, **kwargs):


def get_session():
retry_strategy = DEFAULT_RETRY
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.session()
session.mount(HTTPS_PREFIX, adapter)
session.mount(HTTP_PREFIX, adapter)
session.headers.update(
{
"x-atlan-agent": "sdk",
Expand All @@ -144,6 +140,9 @@ class AtlanClient(BaseSettings):
_default_client: "ClassVar[Optional[AtlanClient]]" = None
base_url: Union[Literal["INTERNAL"], HttpUrl]
api_key: str
connect_timeout: float = 30.0
read_timeout: float = 120.0
retry: Retry = DEFAULT_RETRY
_session: requests.Session = PrivateAttr(default_factory=get_session)
_request_params: dict = PrivateAttr()
_workflow_client: Optional[WorkflowClient] = PrivateAttr(default=None)
Expand Down Expand Up @@ -195,6 +194,10 @@ def __init__(self, **data):
"authorization": f"Bearer {self.api_key}",
}
}
session = self._session
adapter = HTTPAdapter(max_retries=self.retry)
session.mount(HTTPS_PREFIX, adapter)
session.mount(HTTP_PREFIX, adapter)
AtlanClient._default_client = self

@property
Expand Down Expand Up @@ -336,16 +339,29 @@ def _call_api_internal(
params["headers"]["X-Atlan-Request-Id"] = request_id_var.get()
if binary_data:
response = self._session.request(
api.method.value, path, data=binary_data, **params
api.method.value,
path,
data=binary_data,
**params,
timeout=(self.connect_timeout, self.read_timeout),
)
elif api.consumes == EVENT_STREAM and api.produces == EVENT_STREAM:
response = self._session.request(
api.method.value, path, **params, stream=True
api.method.value,
path,
**params,
stream=True,
timeout=(self.connect_timeout, self.read_timeout),
)
if download_file_path:
return self._handle_file_download(response.raw, download_file_path)
else:
response = self._session.request(api.method.value, path, **params)
response = self._session.request(
api.method.value,
path,
**params,
timeout=(self.connect_timeout, self.read_timeout),
)
if response is not None:
LOGGER.debug("HTTP Status: %s", response.status_code)
if response is None:
Expand Down Expand Up @@ -1293,22 +1309,6 @@ def find_connections_by_name(
name=name, connector_type=connector_type, attributes=attributes
)

def get_lineage(self, lineage_request: LineageRequest) -> LineageResponse:
"""
Deprecated — this is an older, slower operation to retrieve lineage that will not receive further enhancements.
Use the get_lineage_list operation instead.
:param lineage_request: detailing the lineage query, parameters, and so on to run
:returns: the results of the lineage request
:raises AtlanError: on any API communication issue
"""
warn(
"Lineage retrieval using this method is deprecated, please use 'get_lineage_list' instead.",
DeprecationWarning,
stacklevel=2,
)
return self.asset.get_lineage(lineage_request=lineage_request)

def get_lineage_list(
self, lineage_request: LineageListRequest
) -> LineageListResults:
Expand Down Expand Up @@ -1609,7 +1609,11 @@ def max_retries(

@contextlib.contextmanager
def client_connection(
base_url: Optional[HttpUrl] = None, api_key: Optional[str] = None
base_url: Optional[HttpUrl] = None,
api_key: Optional[str] = None,
connect_timeout: float = 30.0,
read_timeout: float = 120.0,
retry: Retry = DEFAULT_RETRY,
) -> Generator[AtlanClient, None, None]:
"""
Creates a new client created with the given base_url and/api_key. The AtlanClient.default_client will
Expand All @@ -1622,6 +1626,9 @@ def client_connection(
tmp_client = AtlanClient(
base_url=base_url or current_client.base_url,
api_key=api_key or current_client.api_key,
connect_timeout=connect_timeout,
read_timeout=read_timeout,
retry=retry,
)
try:
yield tmp_client
Expand Down
91 changes: 1 addition & 90 deletions tests/integration/lineage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
EntityStatus,
LineageDirection,
)
from pyatlan.model.lineage import FluentLineage, LineageRequest
from pyatlan.model.lineage import FluentLineage
from pyatlan.model.search import DSL, Bool, IndexSearchRequest, Prefix, Term
from tests.integration.client import TestId, delete_asset
from tests.integration.connection_test import create_connection
Expand Down Expand Up @@ -535,95 +535,6 @@ def test_cp_lineage_end(
_assert_lineage(column3, column5, cp_lineage_end)


def test_fetch_lineage_start(
client: AtlanClient,
connection: Connection,
database: Database,
schema: Schema,
table: Table,
mview: MaterialisedView,
view: View,
lineage_start: Process,
lineage_end: Process,
):
lineage = LineageRequest(guid=table.guid, hide_process=True)
response = client.asset.get_lineage(lineage)
_assert_fetch_lineage_start(response, table, mview, view)


def test_cp_fetch_lineage_start(
client: AtlanClient,
column1: Column,
column3: Column,
column5: Column,
cp_lineage_start: ColumnProcess,
cp_lineage_end: ColumnProcess,
):
lineage = LineageRequest(guid=column1.guid, hide_process=True)
response = client.asset.get_lineage(lineage)
_assert_fetch_lineage_start(response, column1, column3, column5)


def test_fetch_lineage_middle(
client: AtlanClient,
connection: Connection,
database: Database,
schema: Schema,
table: Table,
mview: MaterialisedView,
view: View,
lineage_start: Process,
lineage_end: Process,
):
lineage = LineageRequest(guid=mview.guid, hide_process=True)
response = client.asset.get_lineage(lineage)
_assert_lineage_middle(response, table, mview, view, lineage_start, lineage_end)


def test_cp_fetch_lineage_middle(
client: AtlanClient,
column1: Column,
column3: Column,
column5: Column,
cp_lineage_start: ColumnProcess,
cp_lineage_end: ColumnProcess,
):
lineage = LineageRequest(guid=column3.guid, hide_process=True)
response = client.asset.get_lineage(lineage)
_assert_lineage_middle(
response, column1, column3, column5, cp_lineage_start, cp_lineage_end
)


def test_fetch_lineage_end(
client: AtlanClient,
connection: Connection,
database: Database,
schema: Schema,
table: Table,
mview: MaterialisedView,
view: View,
lineage_start: Process,
lineage_end: Process,
):
lineage = LineageRequest(guid=view.guid, hide_process=True)
response = client.asset.get_lineage(lineage)
_assert_fetch_lineage_end(response, table, mview, view)


def test_cp_fetch_lineage_end(
client: AtlanClient,
column1: Column,
column3: Column,
column5: Column,
cp_lineage_start: ColumnProcess,
cp_lineage_end: ColumnProcess,
):
lineage = LineageRequest(guid=column5.guid, hide_process=True)
response = client.asset.get_lineage(lineage)
_assert_fetch_lineage_end(response, column1, column3, column5)


def test_fetch_lineage_start_list(
client: AtlanClient,
connection: Connection,
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/purpose_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ def test_retrieve_purpose(
assert full.policy_mask_type == DataMaskingType.REDACT


@pytest.mark.skip(reason="Test failing with HekaException")
@pytest.mark.order(after="test_retrieve_purpose")
def test_run_query_without_policy(client: AtlanClient, assign_tag_to_asset, query):
response = client.queries.stream(request=query)
Expand All @@ -288,6 +289,7 @@ def test_token_permissions(client: AtlanClient, token):
)


@pytest.mark.skip(reason="Test failing with HekaException")
@pytest.mark.order(after="test_token_permissions")
def test_run_query_with_policy(assign_tag_to_asset, token, query):
with client_connection(api_key=token.attributes.access_token) as redacted:
Expand Down
26 changes: 25 additions & 1 deletion tests/integration/test_index_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
from unittest.mock import patch

import pytest
import requests.exceptions
from urllib3 import Retry

from pyatlan.cache.source_tag_cache import SourceTagName
from pyatlan.client.asset import LOGGER, IndexSearchResults
from pyatlan.client.atlan import AtlanClient
from pyatlan.client.atlan import AtlanClient, client_connection
from pyatlan.model.assets import Asset, AtlasGlossaryTerm, Column, Table
from pyatlan.model.core import AtlanTag, AtlanTagName
from pyatlan.model.enums import AtlanConnectorType, CertificateStatus, SortOrder
Expand Down Expand Up @@ -712,3 +714,25 @@ def test_default_sorting(client: AtlanClient):
assert len(sort_options) == 2
assert sort_options[0].field == QUALIFIED_NAME
assert sort_options[1].field == ASSET_GUID


def test_read_timeout(client: AtlanClient):
request = (FluentSearch().select()).to_request()
with client_connection(read_timeout=0.1, retry=Retry(total=0)) as timed_client:
with pytest.raises(
requests.exceptions.ReadTimeout,
match=".Read timed out\. \(read timeout=0\.1\)", # noqa W605
):
timed_client.asset.search(criteria=request)


def test_connect_timeout(client: AtlanClient):
request = (FluentSearch().select()).to_request()
with client_connection(
connect_timeout=0.0001, retry=Retry(total=0)
) as timed_client:
with pytest.raises(
requests.exceptions.ConnectionError,
match=".(timed out\. \(connect timeout=0\.0001\))|(Failed to establish a new connection.)", # noqa W605
):
timed_client.asset.search(criteria=request)

0 comments on commit ae9732f

Please sign in to comment.