Skip to content

Commit

Permalink
streamlined
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Jan 27, 2024
1 parent eae525d commit 28a019a
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 158 deletions.
40 changes: 9 additions & 31 deletions dbt/adapters/databricks/relation_configs/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from pydantic import BaseModel, ConfigDict
from typing import ClassVar, Dict, Generic, Optional, TypeVar
from typing import ClassVar, Dict, Generic, List, Optional, TypeVar
from typing_extensions import Self, Type

from dbt.adapters.relation_configs.config_base import RelationResults
Expand Down Expand Up @@ -40,25 +40,12 @@ def get_diff(self, other: Self) -> Optional[Self]:
return None


class RelationChange(BaseModel):
model_config = ConfigDict(frozen=True)
data: DatabricksComponentConfig
requires_full_refresh: bool


class DatabricksRelationChangeSet(BaseModel):
"""Class for encapsulating the changes that need to be applied to a Databricks relation."""

model_config = ConfigDict(frozen=True)
changes: Dict[str, RelationChange]

@property
def requires_full_refresh(self) -> bool:
"""Whether or not the relation that is to be configured by this change set requires a full
refresh.
"""

return any(change.requires_full_refresh for change in self.changes.values())
changes: Dict[str, DatabricksComponentConfig]
requires_full_refresh: bool = False

@property
def has_changes(self) -> bool:
Expand Down Expand Up @@ -109,15 +96,15 @@ class DatabricksRelationConfigBase(BaseModel, ABC):
# The list of components that make up the relation config. In the base implemenation, these
# components are applied sequentially to either the existing relation, or the model node, to
# build up the config.
config_components: ClassVar[Dict[Type[DatabricksComponentProcessor], bool]]
config_components: ClassVar[List[Type[DatabricksComponentProcessor]]]
config: Dict[str, DatabricksComponentConfig]

@classmethod
def from_model_node(cls, model_node: ModelNode) -> Self:
"""Build the relation config from a model node."""

config_dict: Dict[str, DatabricksComponentConfig] = {}
for component in cls.config_components.keys():
for component in cls.config_components:
model_component = component.from_model_node(model_node)
if model_component:
config_dict[component.name] = model_component
Expand All @@ -136,20 +123,11 @@ def from_results(cls, results: RelationResults) -> Self:

return cls(config=config_dict)

@abstractmethod
def get_changeset(self, existing: Self) -> Optional[DatabricksRelationChangeSet]:
"""Get the changeset that must be applied to the existing relation to make it match the
current state of the dbt project.
current state of the dbt project. If no changes are required, this method should return
None.
"""

changes = {}

for component, requires_refresh in self.config_components.items():
key = component.name
value = self.config[key]
diff = value.get_diff(existing.config[key])
if diff:
changes[key] = RelationChange(data=diff, requires_full_refresh=requires_refresh)

if len(changes) > 0:
return DatabricksRelationChangeSet(changes=changes)
return None
raise NotImplementedError("Must be implemented by subclass")
37 changes: 30 additions & 7 deletions dbt/adapters/databricks/relation_configs/materialized_view.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from typing import Dict, Optional
from dbt.adapters.databricks.relation_configs.base import (
DatabricksComponentConfig,
DatabricksRelationChangeSet,
DatabricksRelationConfigBase,
)
from dbt.adapters.databricks.relation_configs.comment import (
Expand All @@ -19,10 +22,30 @@


class MaterializedViewConfig(DatabricksRelationConfigBase):
config_components = {
PartitionedByProcessor: True,
CommentProcessor: True,
TblPropertiesProcessor: True,
RefreshProcessor: False,
QueryProcessor: True,
}
config_components = [
PartitionedByProcessor,
CommentProcessor,
TblPropertiesProcessor,
RefreshProcessor,
QueryProcessor,
]

def get_changeset(
self, existing: "MaterializedViewConfig"
) -> Optional[DatabricksRelationChangeSet]:
changes: Dict[str, DatabricksComponentConfig] = {}
requires_refresh = False

for component in self.config_components:
key = component.name
value = self.config[key]
diff = value.get_diff(existing.config[key])
if diff:
requires_refresh = requires_refresh or key != "refresh"
changes[key] = diff

if len(changes) > 0:
return DatabricksRelationChangeSet(
changes=changes, requires_full_refresh=requires_refresh
)
return None
39 changes: 18 additions & 21 deletions dbt/adapters/databricks/relation_configs/streaming_table.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import Optional
from typing import Dict, Optional
from dbt.adapters.databricks.relation_configs.base import (
DatabricksComponentConfig,
DatabricksRelationChangeSet,
DatabricksRelationConfigBase,
RelationChange,
)
from dbt.adapters.databricks.relation_configs.comment import (
CommentProcessor,
Expand All @@ -20,34 +20,31 @@


class StreamingTableConfig(DatabricksRelationConfigBase):
config_components = {
PartitionedByProcessor: True,
CommentProcessor: False,
TblPropertiesProcessor: False,
RefreshProcessor: False,
}
config_components = [
PartitionedByProcessor,
CommentProcessor,
TblPropertiesProcessor,
RefreshProcessor,
]

def get_changeset(
self, existing: "StreamingTableConfig"
) -> Optional[DatabricksRelationChangeSet]:
"""Get the changeset that must be applied to the existing relation to make it match the
current state of the dbt project.
"""
changes: Dict[str, DatabricksComponentConfig] = {}
requires_refresh = False

changes = {}

for component, requires_refresh in self.config_components.items():
for component in self.config_components:
key = component.name
value = self.config[key]
diff = value.get_diff(existing.config[key])
if key == "partition_by":
changes[key] = RelationChange(data=value, requires_full_refresh=diff is not None)
else:
if not diff:
diff = value
if diff != RefreshConfig():
changes[key] = RelationChange(data=diff, requires_full_refresh=requires_refresh)
if key == "partition_by" and diff is not None:
requires_refresh = True
diff = diff or value

if diff != RefreshConfig():
changes[key] = diff

if len(changes) > 0:
return DatabricksRelationChangeSet(changes=changes)
return None
return DatabricksRelationChangeSet(changes=changes, requires_full_refresh=requires_refresh)
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
{% macro get_create_sql_refresh_schedule(cron, time_zone_value) %}
{{ log("cron: " ~ cron) }}
{{ log("time_zone_value: " ~ time_zone_value) }}
{%- if cron -%}
SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%}
{%- endif -%}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
{% endmacro %}

{% macro get_alter_mv_internal(relation, configuration_changes) %}
{%- set refresh = configuration_changes.changes["refresh"].data -%}
{%- set refresh = configuration_changes.changes["refresh"] -%}
-- Currently only schedule can be altered
ALTER MATERIALIZED VIEW {{ relation }}
{{ get_alter_sql_refresh_schedule(refresh.cron, refresh.time_zone_value, refresh.is_altered) -}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,27 +58,26 @@
{% endmacro %}

{% macro get_create_st_internal(relation, configuration_changes, sql) %}
{%- set partition_by = configuration_changes.changes["partition_by"] -%}
{%- set tblproperties = configuration_changes.changes["tblproperties"] -%}
{%- set comment = configuration_changes.changes["comment"] -%}
{%- set refresh = configuration_changes.changes["refresh"] -%}
{%- set partition_by = configuration_changes.changes["partition_by"].partition_by -%}
{%- set tblproperties = configuration_changes.changes["tblproperties"].tblproperties -%}
{%- set comment = configuration_changes.changes["comment"].comment -%}
CREATE OR REFRESH STREAMING TABLE {{ relation }}
{% if partition_by -%}
{{ get_create_sql_partition_by(partition_by.data.partition_by) }}
{{ get_create_sql_partition_by(partition_by) }}
{%- endif %}
{% if comment -%}
{{ get_create_sql_comment(comment.data.comment) }}
{{ get_create_sql_comment(comment) }}
{%- endif %}
{% if tblproperties -%}
{{ get_create_sql_tblproperties(tblproperties.data.tblproperties) }}
{{ get_create_sql_tblproperties(tblproperties) }}
{%- endif %}
AS {{ sql }}
{% endmacro %}

{% macro get_alter_st_internal(relation, configuration_changes) %}
{%- set refresh = configuration_changes.changes["refresh"] -%}
{%- if refresh and refresh.data.cron -%}
{%- if refresh and refresh.cron -%}
ALTER STREAMING TABLE {{ relation }}
{{ get_alter_sql_refresh_schedule(refresh.data.cron, refresh.data.time_zone_value, False) -}}
{{ get_alter_sql_refresh_schedule(refresh.cron, refresh.time_zone_value, False) -}}
{%- endif -%}
{% endmacro %}
55 changes: 0 additions & 55 deletions tests/unit/relation_configs/test_config_base.py

This file was deleted.

9 changes: 2 additions & 7 deletions tests/unit/relation_configs/test_materialized_view_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from agate import Table, Row
from mock import Mock
from dbt.adapters.databricks.relation_configs.base import RelationChange
from dbt.adapters.databricks.relation_configs.comment import CommentConfig
from dbt.adapters.databricks.relation_configs.materialized_view import MaterializedViewConfig
from dbt.adapters.databricks.relation_configs.partitioning import PartitionedByConfig
Expand Down Expand Up @@ -115,10 +114,6 @@ def test_get_changeset__some_changes(self):
assert changeset.has_changes
assert changeset.requires_full_refresh
assert changeset.changes == {
"partition_by": RelationChange(
data=PartitionedByConfig(partition_by=["col_a"]), requires_full_refresh=True
),
"refresh": RelationChange(
data=RefreshConfig(cron="*/5 * * * *"), requires_full_refresh=False
),
"partition_by": PartitionedByConfig(partition_by=["col_a"]),
"refresh": RefreshConfig(cron="*/5 * * * *"),
}
32 changes: 7 additions & 25 deletions tests/unit/relation_configs/test_streaming_table_config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from agate import Table
from mock import Mock
from dbt.adapters.databricks.relation_configs.base import RelationChange
from dbt.adapters.databricks.relation_configs.comment import CommentConfig
from dbt.adapters.databricks.relation_configs.streaming_table import StreamingTableConfig
from dbt.adapters.databricks.relation_configs.partitioning import PartitionedByConfig
Expand Down Expand Up @@ -84,17 +83,9 @@ def test_get_changeset__no_changes(self):
changeset = new.get_changeset(old)
assert not changeset.requires_full_refresh
assert changeset.changes == {
"tblproperties": RelationChange(
data=TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
requires_full_refresh=False,
),
"comment": RelationChange(
data=CommentConfig(comment="This is the table comment"), requires_full_refresh=False
),
"partition_by": RelationChange(
data=PartitionedByConfig(partition_by=["col_a", "col_b"]),
requires_full_refresh=False,
),
"tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
"comment": CommentConfig(comment="This is the table comment"),
"partition_by": PartitionedByConfig(partition_by=["col_a", "col_b"]),
}

def test_get_changeset__some_changes(self):
Expand All @@ -119,17 +110,8 @@ def test_get_changeset__some_changes(self):
assert changeset.has_changes
assert changeset.requires_full_refresh
assert changeset.changes == {
"partition_by": RelationChange(
data=PartitionedByConfig(partition_by=["col_a"]), requires_full_refresh=True
),
"comment": RelationChange(
data=CommentConfig(comment="This is the table comment"), requires_full_refresh=False
),
"tblproperties": RelationChange(
data=TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
requires_full_refresh=False,
),
"refresh": RelationChange(
data=RefreshConfig(cron="*/5 * * * *"), requires_full_refresh=False
),
"partition_by": PartitionedByConfig(partition_by=["col_a"]),
"comment": CommentConfig(comment="This is the table comment"),
"tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}),
"refresh": RefreshConfig(cron="*/5 * * * *"),
}

0 comments on commit 28a019a

Please sign in to comment.