diff --git a/HISTORY.md b/HISTORY.md index 9c61e6559..e0398cc50 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,9 @@ +## 0.0.22 (Apr 12, 2023) + +* Added create method to Column +* Updated Schema.create method to add schema to Database schemas collection +* Updated Table.create method to add table to Schema tables collection + ## 0.0.21 (Apr 11, 2023) * Added relation_attributes parameter to IndexSearchRequest to specify the attributes to be included in each relationship that is included in the results of the search diff --git a/pyatlan/generator/templates/entity.jinja2 b/pyatlan/generator/templates/entity.jinja2 index 837fccdac..4f5564071 100644 --- a/pyatlan/generator/templates/entity.jinja2 +++ b/pyatlan/generator/templates/entity.jinja2 @@ -447,10 +447,14 @@ class {{ entity_def.name }}({{super_classes[0]}} {%- if "Asset" in super_classes {%- elif entity_def.name == "Schema" %} @classmethod # @validate_arguments() - def create(cls, *, name: str, database_qualified_name: str)->{{ entity_def.name }}.Attributes: + def create( + cls, *, name: str, database_qualified_name: str + ) -> Schema.Attributes: if not name: raise ValueError("name cannot be blank") - validate_required_fields(["database_qualified_name"], [database_qualified_name]) + validate_required_fields( + ["database_qualified_name"], [database_qualified_name] + ) fields = database_qualified_name.split("/") if len(fields) != 4: raise ValueError("Invalid database_qualified_name") @@ -465,11 +469,12 @@ class {{ entity_def.name }}({{super_classes[0]}} {%- if "Asset" in super_classes database_qualified_name=database_qualified_name, qualified_name=f"{database_qualified_name}/{name}", connector_name=connector_type.value, + database=Database.ref_by_qualified_name(database_qualified_name), ) {%- elif entity_def.name == "Table" or entity_def.name == "View" %} @classmethod # @validate_arguments() - def create(cls, *, name: str, schema_qualified_name: str)->{{ entity_def.name }}.Attributes: + def create(cls, *, name: str, schema_qualified_name: str) -> {{ entity_def.name }}.Attributes: if not name: raise ValueError("name cannot be blank") validate_required_fields(["schema_qualified_name"], [schema_qualified_name]) @@ -489,7 +494,65 @@ class {{ entity_def.name }}({{super_classes[0]}} {%- if "Asset" in super_classes schema_qualified_name=schema_qualified_name, schema_name=fields[4], connector_name=connector_type.value, + atlan_schema=Schema.ref_by_qualified_name(schema_qualified_name), ) + {%- elif entity_def.name == "Column" %} + @classmethod + # @validate_arguments() + def create( + cls, *, name: str, parent_qualified_name: str, parent_type: type, order: int + ) -> Column.Attributes: + if not name: + raise ValueError("name cannot be blank") + validate_required_fields(["parent_qualified_name"], [parent_qualified_name]) + fields = parent_qualified_name.split("/") + if len(fields) != 6: + raise ValueError("Invalid parent_qualified_name") + try: + connector_type = AtlanConnectorType(fields[1]) # type:ignore + except ValueError as e: + raise ValueError("Invalid parent_qualified_name") from e + ret_value = Column.Attributes( + name=name, + qualified_name=f"{parent_qualified_name}/{name}", + connector_name=connector_type.value, + schema_name=fields[4], + schema_qualified_name=f"{fields[0]}/{fields[1]}/{fields[2]}/{fields[3]}/{fields[4]}", + database_name=fields[3], + database_qualified_name=f"{fields[0]}/{fields[1]}/{fields[2]}/{fields[3]}", + connection_qualified_name=f"{fields[0]}/{fields[1]}/{fields[2]}", + order=order, + ) + if parent_type == Table: + ret_value.table_qualified_name = parent_qualified_name + ret_value.table = Table.ref_by_qualified_name(parent_qualified_name) + elif parent_type == View: + ret_value.view_qualified_name = parent_qualified_name + ret_value.view = View.ref_by_qualified_name(parent_qualified_name) + elif parent_type == MaterialisedView: + ret_value.view_qualified_name = parent_qualified_name + ret_value.materialised_view = MaterialisedView.ref_by_qualified_name( + parent_qualified_name + ) + else: + raise ValueError( + f"parent_type must be either Table, View or MaterializeView" + ) + return ret_value + + @classmethod + # @validate_arguments() + def create( + cls, *, name: str, parent_qualified_name: str, parent_type: type, order: int + ) -> Column: + return Column( + attributes=Column.Attributes.create( + name=name, + parent_qualified_name=parent_qualified_name, + parent_type=parent_type, + order=order, + ) + ) {%- elif entity_def.name == "S3Bucket" %} @classmethod # @validate_arguments() diff --git a/pyatlan/model/assets.py b/pyatlan/model/assets.py index e77ccca14..8da0bcb78 100644 --- a/pyatlan/model/assets.py +++ b/pyatlan/model/assets.py @@ -9098,6 +9098,7 @@ def create(cls, *, name: str, schema_qualified_name: str) -> Table.Attributes: schema_qualified_name=schema_qualified_name, schema_name=fields[4], connector_name=connector_type.value, + atlan_schema=Schema.ref_by_qualified_name(schema_qualified_name), ) attributes: "Table.Attributes" = Field( @@ -10021,6 +10022,63 @@ class Attributes(SQL.Attributes): None, description="", alias="columnDbtModelColumns" ) # relationship + @classmethod + # @validate_arguments() + def create( + cls, *, name: str, parent_qualified_name: str, parent_type: type, order: int + ) -> Column.Attributes: + if not name: + raise ValueError("name cannot be blank") + validate_required_fields(["parent_qualified_name"], [parent_qualified_name]) + fields = parent_qualified_name.split("/") + if len(fields) != 6: + raise ValueError("Invalid parent_qualified_name") + try: + connector_type = AtlanConnectorType(fields[1]) # type:ignore + except ValueError as e: + raise ValueError("Invalid parent_qualified_name") from e + ret_value = Column.Attributes( + name=name, + qualified_name=f"{parent_qualified_name}/{name}", + connector_name=connector_type.value, + schema_name=fields[4], + schema_qualified_name=f"{fields[0]}/{fields[1]}/{fields[2]}/{fields[3]}/{fields[4]}", + database_name=fields[3], + database_qualified_name=f"{fields[0]}/{fields[1]}/{fields[2]}/{fields[3]}", + connection_qualified_name=f"{fields[0]}/{fields[1]}/{fields[2]}", + order=order, + ) + if parent_type == Table: + ret_value.table_qualified_name = parent_qualified_name + ret_value.table = Table.ref_by_qualified_name(parent_qualified_name) + elif parent_type == View: + ret_value.view_qualified_name = parent_qualified_name + ret_value.view = View.ref_by_qualified_name(parent_qualified_name) + elif parent_type == MaterialisedView: + ret_value.view_qualified_name = parent_qualified_name + ret_value.materialised_view = MaterialisedView.ref_by_qualified_name( + parent_qualified_name + ) + else: + raise ValueError( + "parent_type must be either Table, View or MaterializeView" + ) + return ret_value + + @classmethod + # @validate_arguments() + def create( + cls, *, name: str, parent_qualified_name: str, parent_type: type, order: int + ) -> Column: + return Column( + attributes=Column.Attributes.create( + name=name, + parent_qualified_name=parent_qualified_name, + parent_type=parent_type, + order=order, + ) + ) + attributes: "Column.Attributes" = Field( None, description="Map of attributes in the instance and their values. The specific keys of this map will vary by " @@ -10148,6 +10206,7 @@ def create( database_qualified_name=database_qualified_name, qualified_name=f"{database_qualified_name}/{name}", connector_name=connector_type.value, + database=Database.ref_by_qualified_name(database_qualified_name), ) attributes: "Schema.Attributes" = Field( @@ -10785,6 +10844,7 @@ def create(cls, *, name: str, schema_qualified_name: str) -> View.Attributes: schema_qualified_name=schema_qualified_name, schema_name=fields[4], connector_name=connector_type.value, + atlan_schema=Schema.ref_by_qualified_name(schema_qualified_name), ) attributes: "View.Attributes" = Field( diff --git a/pyatlan/model/enums.py b/pyatlan/model/enums.py index 5f7478cf3..4d610df4c 100644 --- a/pyatlan/model/enums.py +++ b/pyatlan/model/enums.py @@ -198,6 +198,17 @@ class AtlanConnectionCategory(Enum): class AtlanConnectorType(str, Enum): category: AtlanConnectionCategory + @classmethod + def _get_connector_type_from_qualified_name( + cls, qualified_name: str + ) -> "AtlanConnectorType": + tokens = qualified_name.split("/") + if len(tokens) > 1: + return AtlanConnectorType[tokens[1].upper()] + raise ValueError( + f"Could not determine AtlanConnectorType from {qualified_name}" + ) + def __new__( cls, value: str, category: AtlanConnectionCategory ) -> "AtlanConnectorType": diff --git a/pyatlan/version.txt b/pyatlan/version.txt index 236c7ad08..818944f5b 100644 --- a/pyatlan/version.txt +++ b/pyatlan/version.txt @@ -1 +1 @@ -0.0.21 +0.0.22 diff --git a/tests/integration/test_entity_model.py b/tests/integration/test_entity_model.py index 584c420f3..54e89d258 100644 --- a/tests/integration/test_entity_model.py +++ b/tests/integration/test_entity_model.py @@ -3,7 +3,6 @@ import os import random import string -import time import pytest import requests @@ -42,6 +41,7 @@ "4af8d57c-61ef-4b57-983c-eff20e6d08b5", "57f5463d-cc2a-4859-bf28-e4fa52002e8e", } +TEMP_CONNECTION_GUID = "b3a5c49a-0c7c-4e66-8453-f4da8d9ce222" @pytest.fixture(scope="module") @@ -85,13 +85,13 @@ def get_environment_variable(name) -> str: return ret_value -@pytest.fixture() +@pytest.fixture(scope="session") def increment_counter(): - i = random.randint(0, 1000) + i = 700 def increment(): nonlocal i - i += 1 + i += 20 return i return increment @@ -187,6 +187,7 @@ def cleanup(atlan_host, headers, atlan_api_key): "Database", "Connection", "View", + "Column", ] for type_name in type_names: print() @@ -523,23 +524,23 @@ def test_create_connection(client: AtlanClient, increment_counter): assert c.guid == guid -@pytest.mark.skip("Connection creation is still intermittently failing") def test_create_database(client: AtlanClient, increment_counter): role = RoleCache.get_id_for_name("$admin") assert role suffix = increment_counter() - connection = Connection.create( - name=f"Integration {suffix}", - connector_type=AtlanConnectorType.SNOWFLAKE, - admin_roles=[role], - admin_groups=["admin"], - ) - response = client.upsert(connection) - assert response.mutated_entities - assert response.mutated_entities.CREATE - assert isinstance(response.mutated_entities.CREATE[0], Connection) - connection = response.mutated_entities.CREATE[0] - connection = client.get_asset_by_guid(connection.guid, Connection) + # connection = Connection.create( + # name=f"Integration {suffix}", + # connector_type=AtlanConnectorType.SNOWFLAKE, + # admin_roles=[role], + # admin_groups=["admin"], + # ) + # response = client.upsert(connection) + # assert response.mutated_entities + # assert response.mutated_entities.CREATE + # assert isinstance(response.mutated_entities.CREATE[0], Connection) + # connection = response.mutated_entities.CREATE[0] + # connection = client.get_asset_by_guid(connection.guid, Connection) + connection = client.get_asset_by_guid(TEMP_CONNECTION_GUID, Connection) database = Database.create( name=f"Integration_{suffix}", connection_qualified_name=connection.attributes.qualified_name, @@ -550,122 +551,98 @@ def test_create_database(client: AtlanClient, increment_counter): assert len(response.mutated_entities.CREATE) == 1 assert isinstance(response.mutated_entities.CREATE[0], Database) assert response.guid_assignments - assert database.guid in response.guid_assignments - guid = response.guid_assignments[database.guid] database = response.mutated_entities.CREATE[0] - assert guid == database.guid - database = client.get_asset_by_guid(guid, Database) - assert isinstance(database, Database) - assert guid == database.guid + client.get_asset_by_guid(database.guid, Database) -@pytest.mark.skip("Connection creation is still intermittently failing") def test_create_schema(client: AtlanClient, increment_counter): role = RoleCache.get_id_for_name("$admin") assert role suffix = increment_counter() - connection = Connection.create( - name=f"Integration {suffix}", - connector_type=AtlanConnectorType.SNOWFLAKE, - admin_roles=[role], - admin_groups=["admin"], - ) - response = client.upsert(connection) - assert response.mutated_entities - assert response.mutated_entities.CREATE - assert isinstance(response.mutated_entities.CREATE[0], Connection) - connection = response.mutated_entities.CREATE[0] - time.sleep(30) - connection = client.get_asset_by_guid(connection.guid, Connection) + # connection = Connection.create( + # name=f"Integration {suffix}", + # connector_type=AtlanConnectorType.SNOWFLAKE, + # admin_roles=[role], + # admin_groups=["admin"], + # ) + # response = client.upsert(connection) + # assert response.mutated_entities + # assert response.mutated_entities.CREATE + # assert isinstance(response.mutated_entities.CREATE[0], Connection) + # connection = response.mutated_entities.CREATE[0] + # time.sleep(30) + connection = client.get_asset_by_guid(TEMP_CONNECTION_GUID, Connection) database = Database.create( name=f"Integration_{suffix}", connection_qualified_name=connection.attributes.qualified_name, ) response = client.upsert(database) - assert response.mutated_entities - assert response.mutated_entities.CREATE - assert isinstance(response.mutated_entities.CREATE[0], Database) - database = response.mutated_entities.CREATE[0] - time.sleep(3) - database = client.get_asset_by_guid(database.guid, Database) + assert (databases := response.assets_created(asset_type=Database)) + assert len(databases) == 1 + database = client.get_asset_by_guid(databases[0].guid, Database) schema = Schema.create( name=f"Integration_{suffix}", database_qualified_name=database.attributes.qualified_name, ) response = client.upsert(schema) - assert response.mutated_entities - assert response.mutated_entities.CREATE - assert len(response.mutated_entities.CREATE) == 1 - assert isinstance(response.mutated_entities.CREATE[0], Schema) - assert response.guid_assignments - assert schema.guid in response.guid_assignments - guid = response.guid_assignments[schema.guid] - schema = response.mutated_entities.CREATE[0] - assert guid == schema.guid - time.sleep(3) - schema = client.get_asset_by_guid(guid, Schema) - assert isinstance(schema, Schema) - assert guid == schema.guid + assert (schemas := response.assets_created(asset_type=Schema)) + assert len(schemas) == 1 + schema = client.get_asset_by_guid(schemas[0].guid, Schema) + assert (databases := response.assets_updated(asset_type=Database)) + assert len(databases) == 1 + database = client.get_asset_by_guid(databases[0].guid, Database) + assert database.attributes.schemas + schemas = database.attributes.schemas + assert len(schemas) == 1 + assert schemas[0].guid == schema.guid -@pytest.mark.skip("Connection creation is still intermittently failing") def test_create_table(client: AtlanClient, increment_counter): role = RoleCache.get_id_for_name("$admin") assert role suffix = increment_counter() - connection = Connection.create( - name=f"Integration {suffix}", - connector_type=AtlanConnectorType.SNOWFLAKE, - admin_roles=[role], - admin_groups=["admin"], - ) - response = client.upsert(connection) - assert response.mutated_entities - assert response.mutated_entities.CREATE - assert isinstance(response.mutated_entities.CREATE[0], Connection) - connection = response.mutated_entities.CREATE[0] - time.sleep(30) - connection = client.get_asset_by_guid(connection.guid, Connection) + # connection = Connection.create( + # name=f"Integration {suffix}", + # connector_type=AtlanConnectorType.SNOWFLAKE, + # admin_roles=[role], + # admin_groups=["admin"], + # ) + # response = client.upsert(connection) + # assert response.mutated_entities + # assert response.mutated_entities.CREATE + # assert isinstance(response.mutated_entities.CREATE[0], Connection) + # connection = response.mutated_entities.CREATE[0] + # time.sleep(30) + connection = client.get_asset_by_guid(TEMP_CONNECTION_GUID, Connection) database = Database.create( name=f"Integration_{suffix}", connection_qualified_name=connection.attributes.qualified_name, ) response = client.upsert(database) - assert response.mutated_entities - assert response.mutated_entities.CREATE - assert isinstance(response.mutated_entities.CREATE[0], Database) - database = response.mutated_entities.CREATE[0] - time.sleep(3) - database = client.get_asset_by_guid(database.guid, Database) + assert (databases := response.assets_created(asset_type=Database)) + database = client.get_asset_by_guid(databases[0].guid, Database) schema = Schema.create( name=f"Integration_{suffix}", database_qualified_name=database.attributes.qualified_name, ) response = client.upsert(schema) - assert response.mutated_entities - assert response.mutated_entities.CREATE - assert isinstance(response.mutated_entities.CREATE[0], Schema) - schema = response.mutated_entities.CREATE[0] - time.sleep(3) - schema = client.get_asset_by_guid(schema.guid, Schema) + assert (schemas := response.assets_created(asset_type=Schema)) + schema = client.get_asset_by_guid(schemas[0].guid, Schema) table = Table.create( name=f"Integration_{suffix}", schema_qualified_name=schema.attributes.qualified_name, ) response = client.upsert(table) - assert response.mutated_entities - assert response.mutated_entities.CREATE - assert len(response.mutated_entities.CREATE) == 1 - assert isinstance(response.mutated_entities.CREATE[0], Table) - assert response.guid_assignments - assert table.guid in response.guid_assignments - guid = response.guid_assignments[table.guid] - table = response.mutated_entities.CREATE[0] - assert guid == table.guid - time.sleep(3) - table = client.get_asset_by_guid(guid, Table) - assert isinstance(table, Table) - assert guid == table.guid + assert (tables := response.assets_created(asset_type=Table)) + assert len(tables) == 1 + table = client.get_asset_by_guid(guid=tables[0].guid, asset_type=Table) + assert (schemas := response.assets_updated(asset_type=Schema)) + assert len(schemas) == 1 + schema = client.get_asset_by_guid(guid=schemas[0].guid, asset_type=Schema) + assert schema.attributes.tables + tables = schema.attributes.tables + assert len(tables) == 1 + assert tables[0].guid == table.guid def test_get_by_qualified_name(client: AtlanClient): @@ -694,6 +671,61 @@ def test_create_view(client: AtlanClient, increment_counter): assert guid == view.guid +def test_create_column(client: AtlanClient, increment_counter): + role = RoleCache.get_id_for_name("$admin") + assert role + suffix = increment_counter() + # connection = Connection.create( + # name=f"Integration {suffix}", + # connector_type=AtlanConnectorType.SNOWFLAKE, + # admin_roles=[role], + # admin_groups=["admin"], + # ) + # response = client.upsert(connection) + # assert response.mutated_entities + # assert response.mutated_entities.CREATE + # assert isinstance(response.mutated_entities.CREATE[0], Connection) + # connection = response.mutated_entities.CREATE[0] + # time.sleep(30) + connection = client.get_asset_by_guid(TEMP_CONNECTION_GUID, Connection) + database = Database.create( + name=f"Integration_{suffix}", + connection_qualified_name=connection.attributes.qualified_name, + ) + response = client.upsert(database) + assert (databases := response.assets_created(asset_type=Database)) + database = client.get_asset_by_guid(databases[0].guid, Database) + schema = Schema.create( + name=f"Integration_{suffix}", + database_qualified_name=database.attributes.qualified_name, + ) + response = client.upsert(schema) + assert (schemas := response.assets_created(asset_type=Schema)) + schema = client.get_asset_by_guid(schemas[0].guid, Schema) + table = Table.create( + name=f"Integration_{suffix}", + schema_qualified_name=schema.attributes.qualified_name, + ) + response = client.upsert(table) + assert (tables := response.assets_created(asset_type=Table)) + table = client.get_asset_by_guid(guid=tables[0].guid, asset_type=Table) + column = Column.create( + name=f"Integration_{suffix}_column", + parent_qualified_name=table.qualified_name, + parent_type=Table, + order=1, + ) + response = client.upsert(column) + assert (columns := response.assets_created(asset_type=Column)) + assert len(columns) == 1 + column = client.get_asset_by_guid(asset_type=Column, guid=columns[0].guid) + table = client.get_asset_by_guid(asset_type=Table, guid=table.guid) + assert table.attributes.columns + columns = table.attributes.columns + assert len(columns) == 1 + assert columns[0].guid == column.guid + + def test_add_and_remove_classifications(client: AtlanClient): glossary = AtlasGlossary.create(name="Integration Classification Test") glossary.attributes.user_description = "This is a description of the glossary"