diff --git a/dbt/adapters/databricks/relation_configs/base.py b/dbt/adapters/databricks/relation_configs/base.py index 05285e0d5..242e6d24c 100644 --- a/dbt/adapters/databricks/relation_configs/base.py +++ b/dbt/adapters/databricks/relation_configs/base.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from pydantic import BaseModel, ConfigDict -from typing import ClassVar, Dict, Generic, Optional, TypeVar +from typing import ClassVar, Dict, Generic, List, Optional, TypeVar from typing_extensions import Self, Type from dbt.adapters.relation_configs.config_base import RelationResults @@ -40,25 +40,12 @@ def get_diff(self, other: Self) -> Optional[Self]: return None -class RelationChange(BaseModel): - model_config = ConfigDict(frozen=True) - data: DatabricksComponentConfig - requires_full_refresh: bool - - class DatabricksRelationChangeSet(BaseModel): """Class for encapsulating the changes that need to be applied to a Databricks relation.""" model_config = ConfigDict(frozen=True) - changes: Dict[str, RelationChange] - - @property - def requires_full_refresh(self) -> bool: - """Whether or not the relation that is to be configured by this change set requires a full - refresh. - """ - - return any(change.requires_full_refresh for change in self.changes.values()) + changes: Dict[str, DatabricksComponentConfig] + requires_full_refresh: bool = False @property def has_changes(self) -> bool: @@ -109,7 +96,7 @@ class DatabricksRelationConfigBase(BaseModel, ABC): # The list of components that make up the relation config. In the base implemenation, these # components are applied sequentially to either the existing relation, or the model node, to # build up the config. - config_components: ClassVar[Dict[Type[DatabricksComponentProcessor], bool]] + config_components: ClassVar[List[Type[DatabricksComponentProcessor]]] config: Dict[str, DatabricksComponentConfig] @classmethod @@ -117,7 +104,7 @@ def from_model_node(cls, model_node: ModelNode) -> Self: """Build the relation config from a model node.""" config_dict: Dict[str, DatabricksComponentConfig] = {} - for component in cls.config_components.keys(): + for component in cls.config_components: model_component = component.from_model_node(model_node) if model_component: config_dict[component.name] = model_component @@ -136,20 +123,11 @@ def from_results(cls, results: RelationResults) -> Self: return cls(config=config_dict) + @abstractmethod def get_changeset(self, existing: Self) -> Optional[DatabricksRelationChangeSet]: """Get the changeset that must be applied to the existing relation to make it match the - current state of the dbt project. + current state of the dbt project. If no changes are required, this method should return + None. """ - changes = {} - - for component, requires_refresh in self.config_components.items(): - key = component.name - value = self.config[key] - diff = value.get_diff(existing.config[key]) - if diff: - changes[key] = RelationChange(data=diff, requires_full_refresh=requires_refresh) - - if len(changes) > 0: - return DatabricksRelationChangeSet(changes=changes) - return None + raise NotImplementedError("Must be implemented by subclass") diff --git a/dbt/adapters/databricks/relation_configs/materialized_view.py b/dbt/adapters/databricks/relation_configs/materialized_view.py index d69a1d989..4789ede7f 100644 --- a/dbt/adapters/databricks/relation_configs/materialized_view.py +++ b/dbt/adapters/databricks/relation_configs/materialized_view.py @@ -1,4 +1,7 @@ +from typing import Dict, Optional from dbt.adapters.databricks.relation_configs.base import ( + DatabricksComponentConfig, + DatabricksRelationChangeSet, DatabricksRelationConfigBase, ) from dbt.adapters.databricks.relation_configs.comment import ( @@ -19,10 +22,30 @@ class MaterializedViewConfig(DatabricksRelationConfigBase): - config_components = { - PartitionedByProcessor: True, - CommentProcessor: True, - TblPropertiesProcessor: True, - RefreshProcessor: False, - QueryProcessor: True, - } + config_components = [ + PartitionedByProcessor, + CommentProcessor, + TblPropertiesProcessor, + RefreshProcessor, + QueryProcessor, + ] + + def get_changeset( + self, existing: "MaterializedViewConfig" + ) -> Optional[DatabricksRelationChangeSet]: + changes: Dict[str, DatabricksComponentConfig] = {} + requires_refresh = False + + for component in self.config_components: + key = component.name + value = self.config[key] + diff = value.get_diff(existing.config[key]) + if diff: + requires_refresh = requires_refresh or key != "refresh" + changes[key] = diff + + if len(changes) > 0: + return DatabricksRelationChangeSet( + changes=changes, requires_full_refresh=requires_refresh + ) + return None diff --git a/dbt/adapters/databricks/relation_configs/streaming_table.py b/dbt/adapters/databricks/relation_configs/streaming_table.py index b4369c687..e7e24b995 100644 --- a/dbt/adapters/databricks/relation_configs/streaming_table.py +++ b/dbt/adapters/databricks/relation_configs/streaming_table.py @@ -1,8 +1,8 @@ -from typing import Optional +from typing import Dict, Optional from dbt.adapters.databricks.relation_configs.base import ( + DatabricksComponentConfig, DatabricksRelationChangeSet, DatabricksRelationConfigBase, - RelationChange, ) from dbt.adapters.databricks.relation_configs.comment import ( CommentProcessor, @@ -20,12 +20,12 @@ class StreamingTableConfig(DatabricksRelationConfigBase): - config_components = { - PartitionedByProcessor: True, - CommentProcessor: False, - TblPropertiesProcessor: False, - RefreshProcessor: False, - } + config_components = [ + PartitionedByProcessor, + CommentProcessor, + TblPropertiesProcessor, + RefreshProcessor, + ] def get_changeset( self, existing: "StreamingTableConfig" @@ -33,21 +33,18 @@ def get_changeset( """Get the changeset that must be applied to the existing relation to make it match the current state of the dbt project. """ + changes: Dict[str, DatabricksComponentConfig] = {} + requires_refresh = False - changes = {} - - for component, requires_refresh in self.config_components.items(): + for component in self.config_components: key = component.name value = self.config[key] diff = value.get_diff(existing.config[key]) - if key == "partition_by": - changes[key] = RelationChange(data=value, requires_full_refresh=diff is not None) - else: - if not diff: - diff = value - if diff != RefreshConfig(): - changes[key] = RelationChange(data=diff, requires_full_refresh=requires_refresh) + if key == "partition_by" and diff is not None: + requires_refresh = True + diff = diff or value + + if diff != RefreshConfig(): + changes[key] = diff - if len(changes) > 0: - return DatabricksRelationChangeSet(changes=changes) - return None + return DatabricksRelationChangeSet(changes=changes, requires_full_refresh=requires_refresh) diff --git a/dbt/include/databricks/macros/relations/components/refresh_schedule.sql b/dbt/include/databricks/macros/relations/components/refresh_schedule.sql index 065a99e60..618c4a3c3 100644 --- a/dbt/include/databricks/macros/relations/components/refresh_schedule.sql +++ b/dbt/include/databricks/macros/relations/components/refresh_schedule.sql @@ -1,6 +1,4 @@ {% macro get_create_sql_refresh_schedule(cron, time_zone_value) %} - {{ log("cron: " ~ cron) }} - {{ log("time_zone_value: " ~ time_zone_value) }} {%- if cron -%} SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%} {%- endif -%} diff --git a/dbt/include/databricks/macros/relations/materialized_view/alter.sql b/dbt/include/databricks/macros/relations/materialized_view/alter.sql index 669695e89..e6b3d68e8 100644 --- a/dbt/include/databricks/macros/relations/materialized_view/alter.sql +++ b/dbt/include/databricks/macros/relations/materialized_view/alter.sql @@ -44,7 +44,7 @@ {% endmacro %} {% macro get_alter_mv_internal(relation, configuration_changes) %} - {%- set refresh = configuration_changes.changes["refresh"].data -%} + {%- set refresh = configuration_changes.changes["refresh"] -%} -- Currently only schedule can be altered ALTER MATERIALIZED VIEW {{ relation }} {{ get_alter_sql_refresh_schedule(refresh.cron, refresh.time_zone_value, refresh.is_altered) -}} diff --git a/dbt/include/databricks/macros/relations/streaming_table/alter.sql b/dbt/include/databricks/macros/relations/streaming_table/alter.sql index 2f6ae9997..917a23cfb 100644 --- a/dbt/include/databricks/macros/relations/streaming_table/alter.sql +++ b/dbt/include/databricks/macros/relations/streaming_table/alter.sql @@ -58,27 +58,26 @@ {% endmacro %} {% macro get_create_st_internal(relation, configuration_changes, sql) %} - {%- set partition_by = configuration_changes.changes["partition_by"] -%} - {%- set tblproperties = configuration_changes.changes["tblproperties"] -%} - {%- set comment = configuration_changes.changes["comment"] -%} - {%- set refresh = configuration_changes.changes["refresh"] -%} + {%- set partition_by = configuration_changes.changes["partition_by"].partition_by -%} + {%- set tblproperties = configuration_changes.changes["tblproperties"].tblproperties -%} + {%- set comment = configuration_changes.changes["comment"].comment -%} CREATE OR REFRESH STREAMING TABLE {{ relation }} {% if partition_by -%} - {{ get_create_sql_partition_by(partition_by.data.partition_by) }} + {{ get_create_sql_partition_by(partition_by) }} {%- endif %} {% if comment -%} - {{ get_create_sql_comment(comment.data.comment) }} + {{ get_create_sql_comment(comment) }} {%- endif %} {% if tblproperties -%} - {{ get_create_sql_tblproperties(tblproperties.data.tblproperties) }} + {{ get_create_sql_tblproperties(tblproperties) }} {%- endif %} AS {{ sql }} {% endmacro %} {% macro get_alter_st_internal(relation, configuration_changes) %} {%- set refresh = configuration_changes.changes["refresh"] -%} - {%- if refresh and refresh.data.cron -%} + {%- if refresh and refresh.cron -%} ALTER STREAMING TABLE {{ relation }} - {{ get_alter_sql_refresh_schedule(refresh.data.cron, refresh.data.time_zone_value, False) -}} + {{ get_alter_sql_refresh_schedule(refresh.cron, refresh.time_zone_value, False) -}} {%- endif -%} {% endmacro %} \ No newline at end of file diff --git a/tests/unit/relation_configs/test_config_base.py b/tests/unit/relation_configs/test_config_base.py deleted file mode 100644 index c03c90a07..000000000 --- a/tests/unit/relation_configs/test_config_base.py +++ /dev/null @@ -1,55 +0,0 @@ -from dbt.adapters.databricks.relation_configs.base import ( - DatabricksComponentConfig, - DatabricksRelationChangeSet, - RelationChange, -) - -from dbt.adapters.databricks.relation_configs.comment import CommentConfig -from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig - - -class MockComponentConfig(DatabricksComponentConfig): - data: int = 1 - - @property - def requires_full_refresh(self) -> bool: - return True - - -class TestDatabricksComponentConfig: - def test_get_diff__valid_type(self): - config = MockComponentConfig() - other = MockComponentConfig(data=2) - - assert config.get_diff(other) == config - - -class TestDatabricksRelationChangeSet: - def test_requires_full_refresh__no_changes(self): - changeset = DatabricksRelationChangeSet(changes={}) - assert not changeset.requires_full_refresh - - def test_requires_full_refresh__has_only_alterable_changes(self): - changeset = DatabricksRelationChangeSet( - changes={"refresh": RelationChange(data=RefreshConfig(), requires_full_refresh=False)} - ) - assert not changeset.requires_full_refresh - - def test_requires_full_refresh__has_an_inalterable_change(self): - changeset = DatabricksRelationChangeSet( - changes={ - "comment": RelationChange(data=CommentConfig(), requires_full_refresh=True), - "refresh": RelationChange(data=RefreshConfig(), requires_full_refresh=False), - } - ) - assert changeset.requires_full_refresh - - def test_has_changes__no_changes(self): - changeset = DatabricksRelationChangeSet(changes={}) - assert not changeset.has_changes - - def test_has_changes__has_changes(self): - changeset = DatabricksRelationChangeSet( - changes={"refresh": RelationChange(data=RefreshConfig(), requires_full_refresh=False)} - ) - assert changeset.has_changes diff --git a/tests/unit/relation_configs/test_materialized_view_config.py b/tests/unit/relation_configs/test_materialized_view_config.py index 0025b8c67..4d10a72a7 100644 --- a/tests/unit/relation_configs/test_materialized_view_config.py +++ b/tests/unit/relation_configs/test_materialized_view_config.py @@ -1,6 +1,5 @@ from agate import Table, Row from mock import Mock -from dbt.adapters.databricks.relation_configs.base import RelationChange from dbt.adapters.databricks.relation_configs.comment import CommentConfig from dbt.adapters.databricks.relation_configs.materialized_view import MaterializedViewConfig from dbt.adapters.databricks.relation_configs.partitioning import PartitionedByConfig @@ -115,10 +114,6 @@ def test_get_changeset__some_changes(self): assert changeset.has_changes assert changeset.requires_full_refresh assert changeset.changes == { - "partition_by": RelationChange( - data=PartitionedByConfig(partition_by=["col_a"]), requires_full_refresh=True - ), - "refresh": RelationChange( - data=RefreshConfig(cron="*/5 * * * *"), requires_full_refresh=False - ), + "partition_by": PartitionedByConfig(partition_by=["col_a"]), + "refresh": RefreshConfig(cron="*/5 * * * *"), } diff --git a/tests/unit/relation_configs/test_streaming_table_config.py b/tests/unit/relation_configs/test_streaming_table_config.py index d12365dd1..1f88e6219 100644 --- a/tests/unit/relation_configs/test_streaming_table_config.py +++ b/tests/unit/relation_configs/test_streaming_table_config.py @@ -1,6 +1,5 @@ from agate import Table from mock import Mock -from dbt.adapters.databricks.relation_configs.base import RelationChange from dbt.adapters.databricks.relation_configs.comment import CommentConfig from dbt.adapters.databricks.relation_configs.streaming_table import StreamingTableConfig from dbt.adapters.databricks.relation_configs.partitioning import PartitionedByConfig @@ -84,17 +83,9 @@ def test_get_changeset__no_changes(self): changeset = new.get_changeset(old) assert not changeset.requires_full_refresh assert changeset.changes == { - "tblproperties": RelationChange( - data=TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}), - requires_full_refresh=False, - ), - "comment": RelationChange( - data=CommentConfig(comment="This is the table comment"), requires_full_refresh=False - ), - "partition_by": RelationChange( - data=PartitionedByConfig(partition_by=["col_a", "col_b"]), - requires_full_refresh=False, - ), + "tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}), + "comment": CommentConfig(comment="This is the table comment"), + "partition_by": PartitionedByConfig(partition_by=["col_a", "col_b"]), } def test_get_changeset__some_changes(self): @@ -119,17 +110,8 @@ def test_get_changeset__some_changes(self): assert changeset.has_changes assert changeset.requires_full_refresh assert changeset.changes == { - "partition_by": RelationChange( - data=PartitionedByConfig(partition_by=["col_a"]), requires_full_refresh=True - ), - "comment": RelationChange( - data=CommentConfig(comment="This is the table comment"), requires_full_refresh=False - ), - "tblproperties": RelationChange( - data=TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}), - requires_full_refresh=False, - ), - "refresh": RelationChange( - data=RefreshConfig(cron="*/5 * * * *"), requires_full_refresh=False - ), + "partition_by": PartitionedByConfig(partition_by=["col_a"]), + "comment": CommentConfig(comment="This is the table comment"), + "tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}), + "refresh": RefreshConfig(cron="*/5 * * * *"), }