diff --git a/tests/integration/purpose_test.py b/tests/integration/purpose_test.py index 44d5c66b2..d624d73af 100644 --- a/tests/integration/purpose_test.py +++ b/tests/integration/purpose_test.py @@ -1,11 +1,13 @@ # SPDX-License-Identifier: Apache-2.0 # Copyright 2022 Atlan Pte. Ltd. +import contextlib import time from typing import Generator import pytest -from pyatlan.client.atlan import AtlanClient +from pyatlan.client.atlan import AtlanClient, client_connection +from pyatlan.errors import NotFoundError from pyatlan.model.api_tokens import ApiToken from pyatlan.model.assets import AuthPolicy, Column, Purpose from pyatlan.model.constants import SERVICE_ACCOUNT_ @@ -13,7 +15,6 @@ from pyatlan.model.enums import ( AssetSidebarTab, AtlanConnectorType, - AtlanTagColor, AuthPolicyType, DataAction, DataMaskingType, @@ -22,7 +23,6 @@ QueryStatus, ) from pyatlan.model.query import QueryRequest -from pyatlan.model.typedef import AtlanTagDef from tests.integration.client import TestId, delete_asset from tests.integration.requests_test import delete_token @@ -47,25 +47,6 @@ def snowflake_column_qn(snowflake_conn): return f"{snowflake_conn.qualified_name}/{DB_NAME}/{SCHEMA_NAME}/{TABLE_NAME}/{COLUMN_NAME}" -@pytest.fixture(scope="module") -def atlan_tag_def( - client: AtlanClient, - snowflake_column_qn, -) -> Generator[AtlanTagDef, None, None]: - atlan_tag_def = AtlanTagDef.create(name=MODULE_NAME, color=AtlanTagColor.GREEN) - typedef = client.typedef.create(atlan_tag_def) - yield typedef.atlan_tag_defs[0] - # The client can be re-instantiated inside test cases, e.g `test_run_query_with_policy`. - # Therefore, here we need to explicitly create the client - client = AtlanClient() - client.asset.remove_atlan_tag( - asset_type=Column, - qualified_name=snowflake_column_qn, - atlan_tag_name=MODULE_NAME, - ) - client.typedef.purge(MODULE_NAME, typedef_type=AtlanTagDef) - - @pytest.fixture(scope="module") def token(client: AtlanClient) -> Generator[ApiToken, None, None]: token = None @@ -105,8 +86,8 @@ def query(snowflake_conn) -> QueryRequest: @pytest.fixture(scope="module") -def atlan_tag_name(atlan_tag_def): - return AtlanTagName(atlan_tag_def.display_name) +def atlan_tag_name(make_atlan_tag): + return AtlanTagName(make_atlan_tag(name=MODULE_NAME).display_name) @pytest.fixture(scope="module") @@ -123,7 +104,7 @@ def purpose( @pytest.fixture(scope="module") def assign_tag_to_asset(client, snowflake_column_qn): - result = client.asset.add_atlan_tags( + yield client.asset.add_atlan_tags( asset_type=Column, qualified_name=snowflake_column_qn, atlan_tag_names=[MODULE_NAME], @@ -131,7 +112,11 @@ def assign_tag_to_asset(client, snowflake_column_qn): remove_propagation_on_delete=False, restrict_lineage_propagation=False, ) - return result + client.asset.remove_atlan_tag( + asset_type=Column, + qualified_name=snowflake_column_qn, + atlan_tag_name=MODULE_NAME, + ) def test_query(query): @@ -185,15 +170,16 @@ def test_find_purpose_by_name( client: AtlanClient, purpose: Purpose, ): - result = client.asset.find_purposes_by_name( - MODULE_NAME, attributes=["purposeClassifications"] - ) - count = 0 - # TODO: replace with exponential back-off and jitter - while not result and count < 10: - time.sleep(2) - result = client.asset.find_purposes_by_name(MODULE_NAME) - count += 1 + with contextlib.suppress(NotFoundError): + result = client.asset.find_purposes_by_name( + MODULE_NAME, attributes=["purposeClassifications"] + ) + count = 0 + # TODO: replace with exponential back-off and jitter + while not result and count < 10: + time.sleep(2) + result = client.asset.find_purposes_by_name(MODULE_NAME) + count += 1 assert result assert len(result) == 1 assert result[0].guid == purpose.guid @@ -303,37 +289,35 @@ def test_token_permissions(client: AtlanClient, token): @pytest.mark.order(after="test_token_permissions") -def test_run_query_with_policy(client: AtlanClient, assign_tag_to_asset, token, query): - redacted = AtlanClient( - base_url=client.base_url, api_key=token.attributes.access_token - ) - # The policy will take some time to go into effect - # start by waiting a reasonable set amount of time - # (limit the same query re-running multiple times on data store) - time.sleep(30) - count = 0 - response = None - found = HekaFlow.BYPASS - - # TODO: replace with exponential back-off and jitter - while found == HekaFlow.BYPASS and count < 30: - time.sleep(2) - response = redacted.queries.stream(query) - assert response - assert response.details - assert response.details.status - assert response.details.heka_flow - status = response.details.status - if status != QueryStatus.ERROR: - found = response.details.heka_flow - count += 1 +def test_run_query_with_policy(assign_tag_to_asset, token, query): + with client_connection(api_key=token.attributes.access_token) as redacted: + # The policy will take some time to go into effect + # start by waiting a reasonable set amount of time + # (limit the same query re-running multiple times on data store) + time.sleep(30) + count = 0 + response = None + found = HekaFlow.BYPASS + + # TODO: replace with exponential back-off and jitter + while found == HekaFlow.BYPASS and count < 30: + time.sleep(2) + response = redacted.queries.stream(query) + assert response + assert response.details + assert response.details.status + assert response.details.heka_flow + status = response.details.status + if status != QueryStatus.ERROR: + found = response.details.heka_flow + count += 1 - assert response - assert response.rows - assert len(response.rows) > 1 - row = response.rows[0] - assert row - assert len(row) == 7 - assert row[2] - # Ensure it IS redacted - assert row[2].startswith("Xx") + assert response + assert response.rows + assert len(response.rows) > 1 + row = response.rows[0] + assert row + assert len(row) == 7 + assert row[2] + # Ensure it IS redacted + assert row[2].startswith("Xx") diff --git a/tests/integration/test_task_client.py b/tests/integration/test_task_client.py index e3a2a16db..467198d3f 100644 --- a/tests/integration/test_task_client.py +++ b/tests/integration/test_task_client.py @@ -5,12 +5,7 @@ from pyatlan.client.atlan import AtlanClient from pyatlan.model.assets import Column -from pyatlan.model.enums import ( - AtlanConnectorType, - AtlanTagColor, - AtlanTaskType, - SortOrder, -) +from pyatlan.model.enums import AtlanConnectorType, AtlanTaskType, SortOrder from pyatlan.model.fluent_tasks import FluentTasks from pyatlan.model.search import SortItem from pyatlan.model.task import AtlanTask, TaskSearchRequest @@ -39,7 +34,9 @@ def snowflake_column_qn(snowflake_conn): @pytest.fixture() -def snowflake_column(client: AtlanClient, snowflake_column_qn) -> Column: +def snowflake_column( + client: AtlanClient, snowflake_column_qn +) -> Generator[Column, None, None]: client.asset.add_atlan_tags( asset_type=Column, qualified_name=snowflake_column_qn, @@ -51,7 +48,13 @@ def snowflake_column(client: AtlanClient, snowflake_column_qn) -> Column: snowflake_column = client.asset.get_by_qualified_name( snowflake_column_qn, asset_type=Column ) - return snowflake_column + yield snowflake_column + + client.asset.remove_atlan_tag( + asset_type=Column, + qualified_name=snowflake_column_qn, + atlan_tag_name=TAG_NAME, + ) @pytest.fixture() @@ -72,19 +75,8 @@ def task_search_request(snowflake_column: Column) -> TaskSearchRequest: @pytest.fixture(scope="module") -def atlan_tag_def( - client: AtlanClient, - snowflake_column_qn, -) -> Generator[AtlanTagDef, None, None]: - atlan_tag_def = AtlanTagDef.create(name=TAG_NAME, color=AtlanTagColor.GREEN) - typedef = client.typedef.create(atlan_tag_def) - yield typedef.atlan_tag_defs[0] - client.asset.remove_atlan_tag( - asset_type=Column, - qualified_name=snowflake_column_qn, - atlan_tag_name=TAG_NAME, - ) - client.typedef.purge(TAG_NAME, typedef_type=AtlanTagDef) +def atlan_tag_def(make_atlan_tag) -> AtlanTagDef: + return make_atlan_tag(TAG_NAME) def test_task_search(