Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Materialized View Config Level Up #536

Merged
merged 35 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c357126
wip
benc-db Dec 8, 2023
1bddd0b
wip
benc-db Dec 12, 2023
c6c421d
wip
benc-db Dec 12, 2023
5f12412
wip
benc-db Dec 14, 2023
522bb9f
mostly fleshed out
benc-db Dec 18, 2023
4c6fb7c
type issue
benc-db Dec 18, 2023
d2579bb
macro issue fix
benc-db Dec 18, 2023
2d1cb77
materialization typo
benc-db Dec 18, 2023
0b42534
address dependabot issue
benc-db Dec 19, 2023
d163d28
minor macro issue
benc-db Dec 19, 2023
c948b1b
lint passing
benc-db Dec 19, 2023
e1ae870
fixing unit tests, hopefully
benc-db Dec 19, 2023
f0fb0a7
does this fix it?
benc-db Dec 19, 2023
f3f1cd7
wip
benc-db Dec 19, 2023
383dc9a
passing first set of functional tests
benc-db Dec 19, 2023
8d79d5a
all functional tests passing
benc-db Dec 20, 2023
ab00670
wip
benc-db Dec 20, 2023
d975c6c
trying to pass linter
benc-db Dec 20, 2023
eda0f90
Revert "trying to pass linter"
benc-db Dec 20, 2023
daea094
skip mv change tests for non-sql profiles
benc-db Dec 20, 2023
8d3bab5
redone, successfully
benc-db Dec 22, 2023
de1cc29
Changelog
benc-db Jan 2, 2024
b7f43c1
does this fix test failures
benc-db Jan 3, 2024
a4f5c0d
doc strings
benc-db Jan 3, 2024
a99093c
fix rebase
benc-db Jan 18, 2024
d0026cc
this is lame, but need to ensure test consistency...investigating oth…
benc-db Jan 19, 2024
dc51813
trying a different approach
benc-db Jan 19, 2024
d40d8fb
bringing back the wait
benc-db Jan 20, 2024
6903bcf
make a test more reliable
benc-db Jan 24, 2024
6c95405
drop mvs
benc-db Jan 24, 2024
e43703e
touch up regex behavior
benc-db Jan 24, 2024
63f02d1
Changelog
benc-db Jan 24, 2024
84e63d7
more sleep
benc-db Jan 25, 2024
145ff50
tests
benc-db Jan 25, 2024
057fe6c
fix linting
benc-db Jan 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11
3.8
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
## dbt-databricks 1.7.5 (TBD)

### Features

- Support `on_config_change` for materialized views, expand the supported config options ([536](https://github.com/databricks/dbt-databricks/pull/536)))

## dbt-databricks 1.7.4 (Jan 24, 2024)

### Features

- Support `on_config_change` for materialized views, expand the supported config options ([536](https://github.com/databricks/dbt-databricks/pull/536)))

### Fixes

- Added python model specific connection handling to prevent using invalid sessions ([547](https://github.com/databricks/dbt-databricks/pull/547))
Expand Down
9 changes: 7 additions & 2 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@

logger = AdapterLogger("Databricks")

mv_refresh_regex = re.compile(r"refresh\s+materialized\s+view\s+([`\w.]+)", re.IGNORECASE)
st_refresh_regex = re.compile(
r"create\s+or\s+refresh\s+streaming\s+table\s+([`\w.]+)", re.IGNORECASE
)


class DbtCoreHandler(logging.Handler):
def __init__(self, level: Union[str, int], dbt_logger: AdapterLogger):
Expand Down Expand Up @@ -1494,9 +1499,9 @@ def _should_poll_refresh(sql: str) -> Tuple[bool, str]:
# if the command was to refresh a materialized view we need to poll
# the pipeline until the refresh is finished.
name = ""
refresh_search = re.search(r"refresh\s+materialized\s+view\s+([`\w.]+)", sql)
refresh_search = mv_refresh_regex.search(sql)
if not refresh_search:
refresh_search = re.search(r"create\s+or\s+refresh\s+streaming\s+table\s+([`\w.]+)", sql)
refresh_search = st_refresh_regex.search(sql)

if refresh_search:
name = refresh_search.group(1).replace("`", "")
Expand Down
37 changes: 35 additions & 2 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
)
from dbt.adapters.spark.impl import (
SparkAdapter,
DESCRIBE_TABLE_EXTENDED_MACRO_NAME,
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME,
KEY_TABLE_OWNER,
KEY_TABLE_STATISTICS,
Expand All @@ -41,7 +42,7 @@
from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER, empty_table
from dbt.contracts.connection import AdapterResponse, Connection
from dbt.contracts.graph.manifest import Manifest
from dbt.contracts.graph.nodes import ResultNode
from dbt.contracts.graph.nodes import ResultNode, ModelNode
from dbt.contracts.relation import RelationType
import dbt.exceptions
from dbt.events import AdapterLogger
Expand All @@ -58,7 +59,10 @@
DatabricksRelation,
DatabricksRelationType,
)
from dbt.adapters.databricks.utils import redact_credentials, undefined_proof
from dbt.adapters.databricks.relation_configs.base import DatabricksRelationConfigBase
from dbt.adapters.databricks.relation_configs.materialized_view import MaterializedViewConfig
from dbt.adapters.databricks.utils import redact_credentials, undefined_proof, get_first_row
from dbt.adapters.relation_configs.config_base import RelationResults


logger = AdapterLogger("Databricks")
Expand Down Expand Up @@ -734,3 +738,32 @@ def get_persist_doc_columns(
return_columns[name] = columns[name]

return return_columns

@available.parse_none
def materialized_view_config_from_model(self, model: ModelNode) -> MaterializedViewConfig:
return MaterializedViewConfig.from_model_node(model) # type: ignore

@available.parse_none
def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelationConfigBase:
if relation.type == RelationType.MaterializedView:
results = self.describe_materialized_view(relation)
return MaterializedViewConfig.from_results(results)
else:
raise dbt.exceptions.DbtRuntimeError(
f"The method `DatabricksAdapter.get_relation_config` is not implemented "
f"for the relation type: {relation.type}"
)

def describe_materialized_view(self, relation: DatabricksRelation) -> RelationResults:
kwargs = {"table_name": relation}
results: RelationResults = dict()
results["describe_extended"] = self.execute_macro(
DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs
)

kwargs = {"relation": relation}
results["information_schema.views"] = get_first_row(
self.execute_macro("get_view_description", kwargs=kwargs)
)
results["show_tblproperties"] = self.execute_macro("fetch_tbl_properties", kwargs=kwargs)
return results
19 changes: 18 additions & 1 deletion dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dbt.adapters.databricks.utils import remove_undefined
from dbt.utils import filter_null_values, classproperty
from dbt.exceptions import DbtRuntimeError
from dbt.adapters.base.relation import RelationType

KEY_TABLE_PROVIDER = "Provider"

Expand All @@ -32,7 +33,7 @@ class DatabricksRelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"
MaterializedView = "materializedview"
MaterializedView = "materialized_view"
External = "external"
StreamingTable = "streamingtable"

Expand Down Expand Up @@ -65,6 +66,22 @@ def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]:
data["path"]["database"] = remove_undefined(data["path"]["database"])
return data

renameable_relations = frozenset(
{
RelationType.View,
RelationType.Table,
}
)

# list relations that can be atomically replaced (e.g. `CREATE OR REPLACE my_relation..`
# versus `DROP` and `CREATE`)
replaceable_relations = frozenset(
{
RelationType.View,
RelationType.Table,
}
)

def has_information(self) -> bool:
return self.metadata is not None

Expand Down
156 changes: 156 additions & 0 deletions dbt/adapters/databricks/relation_configs/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from abc import ABC, abstractmethod
from pydantic import BaseModel, ConfigDict
from typing import ClassVar, Dict, Generic, List, Optional, TypeVar
from typing_extensions import Self, Type

from dbt.adapters.relation_configs.config_base import RelationResults
from dbt.contracts.graph.nodes import ModelNode


class DatabricksComponentConfig(BaseModel, ABC):
"""Class for encapsulating a single component of a Databricks relation config.

Ex: A materialized view has a `query` component, which is a string that if changed, requires a
full refresh.
"""

model_config = ConfigDict(frozen=True)

@property
@abstractmethod
def requires_full_refresh(self) -> bool:
"""Whether or not the relation that is configured by this component requires a full refresh
(i.e. a drop and recreate) if this component has changed.
"""

raise NotImplementedError("Must be implemented by subclass")

def get_diff(self, other: Self) -> Optional[Self]:
"""Get the config that must be applied when this component differs from the existing
version. This method is intended to only be called on the new version (i.e. the version
specified in the dbt project).

If the difference does not require any changes to the existing relation, this method should
return None. If some partial change can be applied to the existing relation, the
implementing component should override this method to return an instance representing the
partial change; however, care should be taken to ensure that the returned object retains
the complete config specified in the dbt project, so as to support rendering the `create`
as well as the `alter` statements, for the case where a different component requires full
refresh.

Consider updating tblproperties: we can apply only the differences to an existing relation,
but if some other modified component requires us to completely replace the relation, we
should still be able to construct appropriate create clause from the object returned by
this method.
"""

if self != other:
return self
return None


class DatabricksRelationChangeSet(BaseModel):
"""Class for encapsulating the changes that need to be applied to a Databricks relation."""

model_config = ConfigDict(frozen=True)
changes: Dict[str, DatabricksComponentConfig]

@property
def requires_full_refresh(self) -> bool:
"""Whether or not the relation that is to be configured by this change set requires a full
refresh.
"""

return any(change.requires_full_refresh for change in self.changes.values())

@property
def has_changes(self) -> bool:
"""Whether or not this change set has any changes that need to be applied."""

return len(self.changes) > 0


Component = TypeVar("Component", bound=DatabricksComponentConfig)


class DatabricksComponentProcessor(ABC, Generic[Component]):
"""Class for encapsulating the logic for extracting a single config component from either the
project config, or the existing relation.
"""

# The name of the component. This is used as the key in the config dictionary of the relation
# config.
name: ClassVar[str]

@classmethod
@abstractmethod
def from_results(cls, row: RelationResults) -> Component:
"""Extract the component from the results of a query against the existing relation."""

raise NotImplementedError("Must be implemented by subclass")

@classmethod
@abstractmethod
def from_model_node(cls, model_node: ModelNode) -> Component:
"""Extract the component from the model node.

While some components, e.g. query, can be extracted directly from the model node,
specialized Databricks config can be found in model_node.config.extra.
"""

raise NotImplementedError("Must be implemented by subclass")


class DatabricksRelationConfigBase(BaseModel, ABC):
"""Class for encapsulating the config of a Databricks relation.

Ex: all of the config for specifying a Materialized View is handled by MaterializedViewConfig.
Concretely though, since that config is compatible with the default behavior of this class,
only the list of component processors is specified by its subclass.
"""

# The list of components that make up the relation config. In the base implemenation, these
# components are applied sequentially to either the existing relation, or the model node, to
# build up the config.
config_components: ClassVar[List[Type[DatabricksComponentProcessor]]]
config: Dict[str, DatabricksComponentConfig]

@classmethod
def from_model_node(cls, model_node: ModelNode) -> Self:
"""Build the relation config from a model node."""

config_dict: Dict[str, DatabricksComponentConfig] = {}
for component in cls.config_components:
model_component = component.from_model_node(model_node)
if model_component:
config_dict[component.name] = model_component

return cls(config=config_dict)

@classmethod
def from_results(cls, results: RelationResults) -> Self:
"""Build the relation config from the results of a query against the existing relation."""

config_dict: Dict[str, DatabricksComponentConfig] = {}
for component in cls.config_components:
result_component = component.from_results(results)
if result_component:
config_dict[component.name] = result_component

return cls(config=config_dict)

def get_changeset(self, existing: Self) -> Optional[DatabricksRelationChangeSet]:
"""Get the changeset that must be applied to the existing relation to make it match the
current state of the dbt project.
"""

changes = {}

for key, value in self.config.items():
diff = value.get_diff(existing.config[key])
if diff:
changes[key] = diff

if len(changes) > 0:
return DatabricksRelationChangeSet(changes=changes)
return None
38 changes: 38 additions & 0 deletions dbt/adapters/databricks/relation_configs/comment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Optional, ClassVar
from dbt.contracts.graph.nodes import ModelNode

from dbt.adapters.databricks.relation_configs.base import (
DatabricksComponentConfig,
DatabricksComponentProcessor,
)
from dbt.adapters.relation_configs.config_base import RelationResults


class CommentConfig(DatabricksComponentConfig):
"""Component encapsulating the relation-level comment."""

comment: Optional[str] = None

@property
def requires_full_refresh(self) -> bool:
# TODO: This is only True for MVs since they don't currently allow ALTER VIEW to change the
# comment. Should be False for tables and views, if and when they move to this approach.
return True


class CommentProcessor(DatabricksComponentProcessor[CommentConfig]):
name: ClassVar[str] = "comment"

@classmethod
def from_results(cls, results: RelationResults) -> CommentConfig:
table = results["describe_extended"]
for row in table.rows:
if row[0] == "Comment":
return CommentConfig(comment=row[1])
return CommentConfig()

@classmethod
def from_model_node(cls, model_node: ModelNode) -> CommentConfig:
if model_node.description is not None:
return CommentConfig(comment=model_node.description)
return CommentConfig()
28 changes: 28 additions & 0 deletions dbt/adapters/databricks/relation_configs/materialized_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from dbt.adapters.databricks.relation_configs.base import (
DatabricksRelationConfigBase,
)
from dbt.adapters.databricks.relation_configs.comment import (
CommentProcessor,
)
from dbt.adapters.databricks.relation_configs.partitioning import (
PartitionedByProcessor,
)
from dbt.adapters.databricks.relation_configs.query import (
QueryProcessor,
)
from dbt.adapters.databricks.relation_configs.refresh import (
RefreshProcessor,
)
from dbt.adapters.databricks.relation_configs.tblproperties import (
TblPropertiesProcessor,
)


class MaterializedViewConfig(DatabricksRelationConfigBase):
config_components = [
PartitionedByProcessor,
CommentProcessor,
TblPropertiesProcessor,
RefreshProcessor,
QueryProcessor,
]
Loading
Loading