Skip to content

Commit

Permalink
Merge pull request #421 from atlanhq/FT-782
Browse files Browse the repository at this point in the history
Ft 782 Correct issue deleting created tags at end of tests
  • Loading branch information
ErnestoLoma authored Nov 20, 2024
2 parents f926064 + cdd7a78 commit 12883f7
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 89 deletions.
120 changes: 52 additions & 68 deletions tests/integration/purpose_test.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 Atlan Pte. Ltd.
import contextlib
import time
from typing import Generator

import pytest

from pyatlan.client.atlan import AtlanClient
from pyatlan.client.atlan import AtlanClient, client_connection
from pyatlan.errors import NotFoundError
from pyatlan.model.api_tokens import ApiToken
from pyatlan.model.assets import AuthPolicy, Column, Purpose
from pyatlan.model.constants import SERVICE_ACCOUNT_
from pyatlan.model.core import AtlanTagName
from pyatlan.model.enums import (
AssetSidebarTab,
AtlanConnectorType,
AtlanTagColor,
AuthPolicyType,
DataAction,
DataMaskingType,
Expand All @@ -22,7 +23,6 @@
QueryStatus,
)
from pyatlan.model.query import QueryRequest
from pyatlan.model.typedef import AtlanTagDef
from tests.integration.client import TestId, delete_asset
from tests.integration.requests_test import delete_token

Expand All @@ -47,25 +47,6 @@ def snowflake_column_qn(snowflake_conn):
return f"{snowflake_conn.qualified_name}/{DB_NAME}/{SCHEMA_NAME}/{TABLE_NAME}/{COLUMN_NAME}"


@pytest.fixture(scope="module")
def atlan_tag_def(
client: AtlanClient,
snowflake_column_qn,
) -> Generator[AtlanTagDef, None, None]:
atlan_tag_def = AtlanTagDef.create(name=MODULE_NAME, color=AtlanTagColor.GREEN)
typedef = client.typedef.create(atlan_tag_def)
yield typedef.atlan_tag_defs[0]
# The client can be re-instantiated inside test cases, e.g `test_run_query_with_policy`.
# Therefore, here we need to explicitly create the client
client = AtlanClient()
client.asset.remove_atlan_tag(
asset_type=Column,
qualified_name=snowflake_column_qn,
atlan_tag_name=MODULE_NAME,
)
client.typedef.purge(MODULE_NAME, typedef_type=AtlanTagDef)


@pytest.fixture(scope="module")
def token(client: AtlanClient) -> Generator[ApiToken, None, None]:
token = None
Expand Down Expand Up @@ -105,8 +86,8 @@ def query(snowflake_conn) -> QueryRequest:


@pytest.fixture(scope="module")
def atlan_tag_name(atlan_tag_def):
return AtlanTagName(atlan_tag_def.display_name)
def atlan_tag_name(make_atlan_tag):
return AtlanTagName(make_atlan_tag(name=MODULE_NAME).display_name)


@pytest.fixture(scope="module")
Expand All @@ -123,15 +104,19 @@ def purpose(

@pytest.fixture(scope="module")
def assign_tag_to_asset(client, snowflake_column_qn):
result = client.asset.add_atlan_tags(
yield client.asset.add_atlan_tags(
asset_type=Column,
qualified_name=snowflake_column_qn,
atlan_tag_names=[MODULE_NAME],
propagate=False,
remove_propagation_on_delete=False,
restrict_lineage_propagation=False,
)
return result
client.asset.remove_atlan_tag(
asset_type=Column,
qualified_name=snowflake_column_qn,
atlan_tag_name=MODULE_NAME,
)


def test_query(query):
Expand Down Expand Up @@ -185,15 +170,16 @@ def test_find_purpose_by_name(
client: AtlanClient,
purpose: Purpose,
):
result = client.asset.find_purposes_by_name(
MODULE_NAME, attributes=["purposeClassifications"]
)
count = 0
# TODO: replace with exponential back-off and jitter
while not result and count < 10:
time.sleep(2)
result = client.asset.find_purposes_by_name(MODULE_NAME)
count += 1
with contextlib.suppress(NotFoundError):
result = client.asset.find_purposes_by_name(
MODULE_NAME, attributes=["purposeClassifications"]
)
count = 0
# TODO: replace with exponential back-off and jitter
while not result and count < 10:
time.sleep(2)
result = client.asset.find_purposes_by_name(MODULE_NAME)
count += 1
assert result
assert len(result) == 1
assert result[0].guid == purpose.guid
Expand Down Expand Up @@ -303,37 +289,35 @@ def test_token_permissions(client: AtlanClient, token):


@pytest.mark.order(after="test_token_permissions")
def test_run_query_with_policy(client: AtlanClient, assign_tag_to_asset, token, query):
redacted = AtlanClient(
base_url=client.base_url, api_key=token.attributes.access_token
)
# The policy will take some time to go into effect
# start by waiting a reasonable set amount of time
# (limit the same query re-running multiple times on data store)
time.sleep(30)
count = 0
response = None
found = HekaFlow.BYPASS

# TODO: replace with exponential back-off and jitter
while found == HekaFlow.BYPASS and count < 30:
time.sleep(2)
response = redacted.queries.stream(query)
assert response
assert response.details
assert response.details.status
assert response.details.heka_flow
status = response.details.status
if status != QueryStatus.ERROR:
found = response.details.heka_flow
count += 1
def test_run_query_with_policy(assign_tag_to_asset, token, query):
with client_connection(api_key=token.attributes.access_token) as redacted:
# The policy will take some time to go into effect
# start by waiting a reasonable set amount of time
# (limit the same query re-running multiple times on data store)
time.sleep(30)
count = 0
response = None
found = HekaFlow.BYPASS

# TODO: replace with exponential back-off and jitter
while found == HekaFlow.BYPASS and count < 30:
time.sleep(2)
response = redacted.queries.stream(query)
assert response
assert response.details
assert response.details.status
assert response.details.heka_flow
status = response.details.status
if status != QueryStatus.ERROR:
found = response.details.heka_flow
count += 1

assert response
assert response.rows
assert len(response.rows) > 1
row = response.rows[0]
assert row
assert len(row) == 7
assert row[2]
# Ensure it IS redacted
assert row[2].startswith("Xx")
assert response
assert response.rows
assert len(response.rows) > 1
row = response.rows[0]
assert row
assert len(row) == 7
assert row[2]
# Ensure it IS redacted
assert row[2].startswith("Xx")
34 changes: 13 additions & 21 deletions tests/integration/test_task_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@

from pyatlan.client.atlan import AtlanClient
from pyatlan.model.assets import Column
from pyatlan.model.enums import (
AtlanConnectorType,
AtlanTagColor,
AtlanTaskType,
SortOrder,
)
from pyatlan.model.enums import AtlanConnectorType, AtlanTaskType, SortOrder
from pyatlan.model.fluent_tasks import FluentTasks
from pyatlan.model.search import SortItem
from pyatlan.model.task import AtlanTask, TaskSearchRequest
Expand Down Expand Up @@ -39,7 +34,9 @@ def snowflake_column_qn(snowflake_conn):


@pytest.fixture()
def snowflake_column(client: AtlanClient, snowflake_column_qn) -> Column:
def snowflake_column(
client: AtlanClient, snowflake_column_qn
) -> Generator[Column, None, None]:
client.asset.add_atlan_tags(
asset_type=Column,
qualified_name=snowflake_column_qn,
Expand All @@ -51,7 +48,13 @@ def snowflake_column(client: AtlanClient, snowflake_column_qn) -> Column:
snowflake_column = client.asset.get_by_qualified_name(
snowflake_column_qn, asset_type=Column
)
return snowflake_column
yield snowflake_column

client.asset.remove_atlan_tag(
asset_type=Column,
qualified_name=snowflake_column_qn,
atlan_tag_name=TAG_NAME,
)


@pytest.fixture()
Expand All @@ -72,19 +75,8 @@ def task_search_request(snowflake_column: Column) -> TaskSearchRequest:


@pytest.fixture(scope="module")
def atlan_tag_def(
client: AtlanClient,
snowflake_column_qn,
) -> Generator[AtlanTagDef, None, None]:
atlan_tag_def = AtlanTagDef.create(name=TAG_NAME, color=AtlanTagColor.GREEN)
typedef = client.typedef.create(atlan_tag_def)
yield typedef.atlan_tag_defs[0]
client.asset.remove_atlan_tag(
asset_type=Column,
qualified_name=snowflake_column_qn,
atlan_tag_name=TAG_NAME,
)
client.typedef.purge(TAG_NAME, typedef_type=AtlanTagDef)
def atlan_tag_def(make_atlan_tag) -> AtlanTagDef:
return make_atlan_tag(TAG_NAME)


def test_task_search(
Expand Down

0 comments on commit 12883f7

Please sign in to comment.