diff --git a/tests/integration/test_index_search.py b/tests/integration/test_index_search.py index 2aa9d7b05..6cad614e1 100644 --- a/tests/integration/test_index_search.py +++ b/tests/integration/test_index_search.py @@ -37,10 +37,10 @@ NOW_AS_YYYY_MM_DD = datetime.today().strftime("%Y-%m-%d") EXISTING_TAG = "Issue" EXISTING_SOURCE_SYNCED_TAG = "Confidential" -COLUMN_WITH_CUSTOM_ATTRIBUTES = ( - "default/snowflake/1733440936/ANALYTICS" - "/WIDE_WORLD_IMPORTERS/STG_STATE_PROVINCES/LATEST_RECORDED_POPULATION" -) +DB_NAME = "ANALYTICS" +TABLE_NAME = "STG_STATE_PROVINCES" +COLUMN_NAME = "LATEST_RECORDED_POPULATION" +SCHEMA_NAME = "WIDE_WORLD_IMPORTERS" VALUES_FOR_TERM_QUERIES = { "with_categories": "VBsYc9dUoEcAtDxZmjby6@mweSfpXBwfYWedQTvA3Gi", @@ -94,6 +94,18 @@ } +@pytest.fixture(scope="module") +def snowflake_conn(client: AtlanClient): + return client.asset.find_connections_by_name( + "development", AtlanConnectorType.SNOWFLAKE + )[0] + + +@pytest.fixture(scope="module") +def snowflake_column_qn(snowflake_conn): + return f"{snowflake_conn.qualified_name}/{DB_NAME}/{SCHEMA_NAME}/{TABLE_NAME}/{COLUMN_NAME}" + + @dataclass() class AssetTracker: missing_types: Set[str] = field(default_factory=set) @@ -247,12 +259,13 @@ def test_source_tag_assign_with_value(client: AtlanClient, table: Table): _assert_source_tag(tables, EXISTING_SOURCE_SYNCED_TAG, "Not Restricted") -def test_search_source_specific_custom_attributes(client: AtlanClient): +def test_search_source_specific_custom_attributes( + client: AtlanClient, snowflake_column_qn: str +): # Test with get_by_qualified_name() - column_qn = COLUMN_WITH_CUSTOM_ATTRIBUTES asset = client.asset.get_by_qualified_name( asset_type=Column, - qualified_name=column_qn, + qualified_name=snowflake_column_qn, min_ext_info=True, ignore_relationships=True, ) @@ -262,7 +275,7 @@ def test_search_source_specific_custom_attributes(client: AtlanClient): results = ( FluentSearch() .where(CompoundQuery.active_assets()) - .where(Column.QUALIFIED_NAME.eq(column_qn)) + .where(Column.QUALIFIED_NAME.eq(snowflake_column_qn)) .include_on_results(Column.CUSTOM_ATTRIBUTES) .execute(client=client) )