diff --git a/.python-version b/.python-version index 2c0733315..cc1923a40 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.11 +3.8 diff --git a/CHANGELOG.md b/CHANGELOG.md index 0dc3b5989..25021014a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ +## dbt-databricks 1.7.5 (TBD) + +### Features + +- Support `on_config_change` for materialized views, expand the supported config options ([536](https://github.com/databricks/dbt-databricks/pull/536))) + ## dbt-databricks 1.7.4 (Jan 24, 2024) +### Features + +- Support `on_config_change` for materialized views, expand the supported config options ([536](https://github.com/databricks/dbt-databricks/pull/536))) + ### Fixes - Added python model specific connection handling to prevent using invalid sessions ([547](https://github.com/databricks/dbt-databricks/pull/547)) diff --git a/dbt/adapters/databricks/connections.py b/dbt/adapters/databricks/connections.py index 94a59b7c9..707d12e22 100644 --- a/dbt/adapters/databricks/connections.py +++ b/dbt/adapters/databricks/connections.py @@ -81,6 +81,11 @@ logger = AdapterLogger("Databricks") +mv_refresh_regex = re.compile(r"refresh\s+materialized\s+view\s+([`\w.]+)", re.IGNORECASE) +st_refresh_regex = re.compile( + r"create\s+or\s+refresh\s+streaming\s+table\s+([`\w.]+)", re.IGNORECASE +) + class DbtCoreHandler(logging.Handler): def __init__(self, level: Union[str, int], dbt_logger: AdapterLogger): @@ -1494,9 +1499,9 @@ def _should_poll_refresh(sql: str) -> Tuple[bool, str]: # if the command was to refresh a materialized view we need to poll # the pipeline until the refresh is finished. name = "" - refresh_search = re.search(r"refresh\s+materialized\s+view\s+([`\w.]+)", sql) + refresh_search = mv_refresh_regex.search(sql) if not refresh_search: - refresh_search = re.search(r"create\s+or\s+refresh\s+streaming\s+table\s+([`\w.]+)", sql) + refresh_search = st_refresh_regex.search(sql) if refresh_search: name = refresh_search.group(1).replace("`", "") diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 2eaa6a87f..e10250c5c 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -31,6 +31,7 @@ ) from dbt.adapters.spark.impl import ( SparkAdapter, + DESCRIBE_TABLE_EXTENDED_MACRO_NAME, GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, KEY_TABLE_OWNER, KEY_TABLE_STATISTICS, @@ -41,7 +42,7 @@ from dbt.clients.agate_helper import DEFAULT_TYPE_TESTER, empty_table from dbt.contracts.connection import AdapterResponse, Connection from dbt.contracts.graph.manifest import Manifest -from dbt.contracts.graph.nodes import ResultNode +from dbt.contracts.graph.nodes import ResultNode, ModelNode from dbt.contracts.relation import RelationType import dbt.exceptions from dbt.events import AdapterLogger @@ -58,7 +59,10 @@ DatabricksRelation, DatabricksRelationType, ) -from dbt.adapters.databricks.utils import redact_credentials, undefined_proof +from dbt.adapters.databricks.relation_configs.base import DatabricksRelationConfigBase +from dbt.adapters.databricks.relation_configs.materialized_view import MaterializedViewConfig +from dbt.adapters.databricks.utils import redact_credentials, undefined_proof, get_first_row +from dbt.adapters.relation_configs.config_base import RelationResults logger = AdapterLogger("Databricks") @@ -734,3 +738,32 @@ def get_persist_doc_columns( return_columns[name] = columns[name] return return_columns + + @available.parse_none + def materialized_view_config_from_model(self, model: ModelNode) -> MaterializedViewConfig: + return MaterializedViewConfig.from_model_node(model) # type: ignore + + @available.parse_none + def get_relation_config(self, relation: DatabricksRelation) -> DatabricksRelationConfigBase: + if relation.type == RelationType.MaterializedView: + results = self.describe_materialized_view(relation) + return MaterializedViewConfig.from_results(results) + else: + raise dbt.exceptions.DbtRuntimeError( + f"The method `DatabricksAdapter.get_relation_config` is not implemented " + f"for the relation type: {relation.type}" + ) + + def describe_materialized_view(self, relation: DatabricksRelation) -> RelationResults: + kwargs = {"table_name": relation} + results: RelationResults = dict() + results["describe_extended"] = self.execute_macro( + DESCRIBE_TABLE_EXTENDED_MACRO_NAME, kwargs=kwargs + ) + + kwargs = {"relation": relation} + results["information_schema.views"] = get_first_row( + self.execute_macro("get_view_description", kwargs=kwargs) + ) + results["show_tblproperties"] = self.execute_macro("fetch_tbl_properties", kwargs=kwargs) + return results diff --git a/dbt/adapters/databricks/relation.py b/dbt/adapters/databricks/relation.py index 07046f98b..17cdc5501 100644 --- a/dbt/adapters/databricks/relation.py +++ b/dbt/adapters/databricks/relation.py @@ -10,6 +10,7 @@ from dbt.adapters.databricks.utils import remove_undefined from dbt.utils import filter_null_values, classproperty from dbt.exceptions import DbtRuntimeError +from dbt.adapters.base.relation import RelationType KEY_TABLE_PROVIDER = "Provider" @@ -32,7 +33,7 @@ class DatabricksRelationType(StrEnum): Table = "table" View = "view" CTE = "cte" - MaterializedView = "materializedview" + MaterializedView = "materialized_view" External = "external" StreamingTable = "streamingtable" @@ -65,6 +66,22 @@ def __pre_deserialize__(cls, data: Dict[Any, Any]) -> Dict[Any, Any]: data["path"]["database"] = remove_undefined(data["path"]["database"]) return data + renameable_relations = frozenset( + { + RelationType.View, + RelationType.Table, + } + ) + + # list relations that can be atomically replaced (e.g. `CREATE OR REPLACE my_relation..` + # versus `DROP` and `CREATE`) + replaceable_relations = frozenset( + { + RelationType.View, + RelationType.Table, + } + ) + def has_information(self) -> bool: return self.metadata is not None diff --git a/dbt/adapters/databricks/relation_configs/base.py b/dbt/adapters/databricks/relation_configs/base.py new file mode 100644 index 000000000..7a6216eac --- /dev/null +++ b/dbt/adapters/databricks/relation_configs/base.py @@ -0,0 +1,156 @@ +from abc import ABC, abstractmethod +from pydantic import BaseModel, ConfigDict +from typing import ClassVar, Dict, Generic, List, Optional, TypeVar +from typing_extensions import Self, Type + +from dbt.adapters.relation_configs.config_base import RelationResults +from dbt.contracts.graph.nodes import ModelNode + + +class DatabricksComponentConfig(BaseModel, ABC): + """Class for encapsulating a single component of a Databricks relation config. + + Ex: A materialized view has a `query` component, which is a string that if changed, requires a + full refresh. + """ + + model_config = ConfigDict(frozen=True) + + @property + @abstractmethod + def requires_full_refresh(self) -> bool: + """Whether or not the relation that is configured by this component requires a full refresh + (i.e. a drop and recreate) if this component has changed. + """ + + raise NotImplementedError("Must be implemented by subclass") + + def get_diff(self, other: Self) -> Optional[Self]: + """Get the config that must be applied when this component differs from the existing + version. This method is intended to only be called on the new version (i.e. the version + specified in the dbt project). + + If the difference does not require any changes to the existing relation, this method should + return None. If some partial change can be applied to the existing relation, the + implementing component should override this method to return an instance representing the + partial change; however, care should be taken to ensure that the returned object retains + the complete config specified in the dbt project, so as to support rendering the `create` + as well as the `alter` statements, for the case where a different component requires full + refresh. + + Consider updating tblproperties: we can apply only the differences to an existing relation, + but if some other modified component requires us to completely replace the relation, we + should still be able to construct appropriate create clause from the object returned by + this method. + """ + + if self != other: + return self + return None + + +class DatabricksRelationChangeSet(BaseModel): + """Class for encapsulating the changes that need to be applied to a Databricks relation.""" + + model_config = ConfigDict(frozen=True) + changes: Dict[str, DatabricksComponentConfig] + + @property + def requires_full_refresh(self) -> bool: + """Whether or not the relation that is to be configured by this change set requires a full + refresh. + """ + + return any(change.requires_full_refresh for change in self.changes.values()) + + @property + def has_changes(self) -> bool: + """Whether or not this change set has any changes that need to be applied.""" + + return len(self.changes) > 0 + + +Component = TypeVar("Component", bound=DatabricksComponentConfig) + + +class DatabricksComponentProcessor(ABC, Generic[Component]): + """Class for encapsulating the logic for extracting a single config component from either the + project config, or the existing relation. + """ + + # The name of the component. This is used as the key in the config dictionary of the relation + # config. + name: ClassVar[str] + + @classmethod + @abstractmethod + def from_results(cls, row: RelationResults) -> Component: + """Extract the component from the results of a query against the existing relation.""" + + raise NotImplementedError("Must be implemented by subclass") + + @classmethod + @abstractmethod + def from_model_node(cls, model_node: ModelNode) -> Component: + """Extract the component from the model node. + + While some components, e.g. query, can be extracted directly from the model node, + specialized Databricks config can be found in model_node.config.extra. + """ + + raise NotImplementedError("Must be implemented by subclass") + + +class DatabricksRelationConfigBase(BaseModel, ABC): + """Class for encapsulating the config of a Databricks relation. + + Ex: all of the config for specifying a Materialized View is handled by MaterializedViewConfig. + Concretely though, since that config is compatible with the default behavior of this class, + only the list of component processors is specified by its subclass. + """ + + # The list of components that make up the relation config. In the base implemenation, these + # components are applied sequentially to either the existing relation, or the model node, to + # build up the config. + config_components: ClassVar[List[Type[DatabricksComponentProcessor]]] + config: Dict[str, DatabricksComponentConfig] + + @classmethod + def from_model_node(cls, model_node: ModelNode) -> Self: + """Build the relation config from a model node.""" + + config_dict: Dict[str, DatabricksComponentConfig] = {} + for component in cls.config_components: + model_component = component.from_model_node(model_node) + if model_component: + config_dict[component.name] = model_component + + return cls(config=config_dict) + + @classmethod + def from_results(cls, results: RelationResults) -> Self: + """Build the relation config from the results of a query against the existing relation.""" + + config_dict: Dict[str, DatabricksComponentConfig] = {} + for component in cls.config_components: + result_component = component.from_results(results) + if result_component: + config_dict[component.name] = result_component + + return cls(config=config_dict) + + def get_changeset(self, existing: Self) -> Optional[DatabricksRelationChangeSet]: + """Get the changeset that must be applied to the existing relation to make it match the + current state of the dbt project. + """ + + changes = {} + + for key, value in self.config.items(): + diff = value.get_diff(existing.config[key]) + if diff: + changes[key] = diff + + if len(changes) > 0: + return DatabricksRelationChangeSet(changes=changes) + return None diff --git a/dbt/adapters/databricks/relation_configs/comment.py b/dbt/adapters/databricks/relation_configs/comment.py new file mode 100644 index 000000000..94e9c359c --- /dev/null +++ b/dbt/adapters/databricks/relation_configs/comment.py @@ -0,0 +1,38 @@ +from typing import Optional, ClassVar +from dbt.contracts.graph.nodes import ModelNode + +from dbt.adapters.databricks.relation_configs.base import ( + DatabricksComponentConfig, + DatabricksComponentProcessor, +) +from dbt.adapters.relation_configs.config_base import RelationResults + + +class CommentConfig(DatabricksComponentConfig): + """Component encapsulating the relation-level comment.""" + + comment: Optional[str] = None + + @property + def requires_full_refresh(self) -> bool: + # TODO: This is only True for MVs since they don't currently allow ALTER VIEW to change the + # comment. Should be False for tables and views, if and when they move to this approach. + return True + + +class CommentProcessor(DatabricksComponentProcessor[CommentConfig]): + name: ClassVar[str] = "comment" + + @classmethod + def from_results(cls, results: RelationResults) -> CommentConfig: + table = results["describe_extended"] + for row in table.rows: + if row[0] == "Comment": + return CommentConfig(comment=row[1]) + return CommentConfig() + + @classmethod + def from_model_node(cls, model_node: ModelNode) -> CommentConfig: + if model_node.description is not None: + return CommentConfig(comment=model_node.description) + return CommentConfig() diff --git a/dbt/adapters/databricks/relation_configs/materialized_view.py b/dbt/adapters/databricks/relation_configs/materialized_view.py new file mode 100644 index 000000000..46fd1627b --- /dev/null +++ b/dbt/adapters/databricks/relation_configs/materialized_view.py @@ -0,0 +1,28 @@ +from dbt.adapters.databricks.relation_configs.base import ( + DatabricksRelationConfigBase, +) +from dbt.adapters.databricks.relation_configs.comment import ( + CommentProcessor, +) +from dbt.adapters.databricks.relation_configs.partitioning import ( + PartitionedByProcessor, +) +from dbt.adapters.databricks.relation_configs.query import ( + QueryProcessor, +) +from dbt.adapters.databricks.relation_configs.refresh import ( + RefreshProcessor, +) +from dbt.adapters.databricks.relation_configs.tblproperties import ( + TblPropertiesProcessor, +) + + +class MaterializedViewConfig(DatabricksRelationConfigBase): + config_components = [ + PartitionedByProcessor, + CommentProcessor, + TblPropertiesProcessor, + RefreshProcessor, + QueryProcessor, + ] diff --git a/dbt/adapters/databricks/relation_configs/partitioning.py b/dbt/adapters/databricks/relation_configs/partitioning.py new file mode 100644 index 000000000..6cc288cd8 --- /dev/null +++ b/dbt/adapters/databricks/relation_configs/partitioning.py @@ -0,0 +1,46 @@ +import itertools +from typing import ClassVar, List + +from dbt.adapters.relation_configs.config_base import RelationResults +from dbt.contracts.graph.nodes import ModelNode +from dbt.adapters.databricks.relation_configs.base import ( + DatabricksComponentConfig, + DatabricksComponentProcessor, +) + + +class PartitionedByConfig(DatabricksComponentConfig): + """Component encapsulating the partitioning of relations.""" + + partition_by: List[str] + + @property + def requires_full_refresh(self) -> bool: + return True + + +class PartitionedByProcessor(DatabricksComponentProcessor): + name: ClassVar[str] = "partition_by" + + @classmethod + def from_results(cls, results: RelationResults) -> PartitionedByConfig: + table = results["describe_extended"] + cols = [] + rows = itertools.takewhile( + lambda row: row[0], + itertools.dropwhile(lambda row: row[0] != "# Partition Information", table.rows), + ) + for row in rows: + if not row[0].startswith("# "): + cols.append(row[0]) + + return PartitionedByConfig(partition_by=cols) + + @classmethod + def from_model_node(cls, model_node: ModelNode) -> PartitionedByConfig: + partition_by = model_node.config.extra.get("partition_by") + if isinstance(partition_by, str): + return PartitionedByConfig(partition_by=[partition_by]) + if not partition_by: + return PartitionedByConfig(partition_by=[]) + return PartitionedByConfig(partition_by=partition_by) diff --git a/dbt/adapters/databricks/relation_configs/query.py b/dbt/adapters/databricks/relation_configs/query.py new file mode 100644 index 000000000..2c8cc37e2 --- /dev/null +++ b/dbt/adapters/databricks/relation_configs/query.py @@ -0,0 +1,36 @@ +from typing import ClassVar +from dbt.adapters.relation_configs.config_base import RelationResults +from dbt.contracts.graph.nodes import ModelNode +from dbt.adapters.databricks.relation_configs.base import ( + DatabricksComponentConfig, + DatabricksComponentProcessor, +) +from dbt.exceptions import DbtRuntimeError + + +class QueryConfig(DatabricksComponentConfig): + """Component encapsulating the query that defines a relation.""" + + query: str + + @property + def requires_full_refresh(self) -> bool: + return True + + +class QueryProcessor(DatabricksComponentProcessor[QueryConfig]): + name: ClassVar[str] = "query" + + @classmethod + def from_results(cls, result: RelationResults) -> QueryConfig: + row = result["information_schema.views"] + return QueryConfig(query=row["view_definition"]) + + @classmethod + def from_model_node(cls, model_node: ModelNode) -> QueryConfig: + query = model_node.compiled_code + + if query: + return QueryConfig(query=query.strip()) + else: + raise DbtRuntimeError(f"Cannot compile model {model_node.unique_id} with no SQL query") diff --git a/dbt/adapters/databricks/relation_configs/refresh.py b/dbt/adapters/databricks/relation_configs/refresh.py new file mode 100644 index 000000000..7768ac4fd --- /dev/null +++ b/dbt/adapters/databricks/relation_configs/refresh.py @@ -0,0 +1,79 @@ +import re +from typing import ClassVar, Optional + +from dbt.adapters.relation_configs.config_base import RelationResults +from dbt.exceptions import DbtRuntimeError +from dbt.contracts.graph.nodes import ModelNode + +from dbt.adapters.databricks.relation_configs.base import ( + DatabricksComponentConfig, + DatabricksComponentProcessor, +) + +SCHEDULE_REGEX = re.compile(r"CRON '(.*)' AT TIME ZONE '(.*)'") + + +class RefreshConfig(DatabricksComponentConfig): + """Component encapsulating the refresh schedule of a relation.""" + + cron: Optional[str] = None + time_zone_value: Optional[str] = None + + # Property indicating whether the schedule change should be accomplished by an ADD SCHEDULE + # vs an ALTER SCHEDULE. This is only True when modifying an existing schedule, rather than + # switching from manual refresh to scheduled or vice versa. + is_altered: bool = False + + @property + def requires_full_refresh(self) -> bool: + return False + + def get_diff(self, other: "RefreshConfig") -> Optional["RefreshConfig"]: + if self != other: + return RefreshConfig( + cron=self.cron, + time_zone_value=self.time_zone_value, + is_altered=self.cron is not None and other.cron is not None, + ) + return None + + +class RefreshProcessor(DatabricksComponentProcessor[RefreshConfig]): + name: ClassVar[str] = "refresh" + + @classmethod + def from_results(cls, results: RelationResults) -> RefreshConfig: + table = results["describe_extended"] + for row in table.rows: + if row[0] == "Refresh Schedule": + if row[1] == "MANUAL": + return RefreshConfig() + + match = SCHEDULE_REGEX.match(row[1]) + + if match: + cron, time_zone_value = match.groups() + return RefreshConfig(cron=cron, time_zone_value=time_zone_value) + + raise DbtRuntimeError( + f"Could not parse schedule from description: {row[1]}." + " This is most likely a bug in the dbt-databricks adapter," + " so please file an issue!" + ) + + raise DbtRuntimeError( + "Could not parse schedule for table." + " This is most likely a bug in the dbt-databricks adapter, so please file an issue!" + ) + + @classmethod + def from_model_node(cls, model_node: ModelNode) -> RefreshConfig: + schedule = model_node.config.extra.get("schedule") + if schedule: + if "cron" not in schedule: + raise DbtRuntimeError(f"Schedule config must contain a 'cron' key, got {schedule}") + return RefreshConfig( + cron=schedule["cron"], time_zone_value=schedule.get("time_zone_value") + ) + else: + return RefreshConfig() diff --git a/dbt/adapters/databricks/relation_configs/tblproperties.py b/dbt/adapters/databricks/relation_configs/tblproperties.py new file mode 100644 index 000000000..d71c36b13 --- /dev/null +++ b/dbt/adapters/databricks/relation_configs/tblproperties.py @@ -0,0 +1,65 @@ +from typing import Any, Dict, ClassVar, List +from dbt.contracts.graph.nodes import ModelNode + +from dbt.adapters.databricks.relation_configs.base import ( + DatabricksComponentConfig, + DatabricksComponentProcessor, +) +from dbt.adapters.relation_configs.config_base import RelationResults +from dbt.exceptions import DbtRuntimeError + + +class TblPropertiesConfig(DatabricksComponentConfig): + """Component encapsulating the tblproperties of a relation.""" + + tblproperties: Dict[str, str] + + # List of tblproperties that should be ignored when comparing configs. These are generally + # set by Databricks and are not user-configurable. + ignore_list: List[str] = ["pipelines.pipelineId"] + + @property + def requires_full_refresh(self) -> bool: + # TODO: This is only True for MVs since they don't currently allow ALTER VIEW to change the + # tblproperties. Should be False for tables and views, if and when they move to this + # approach. + return True + + def __eq__(self, __value: Any) -> bool: + """Override equality check to ignore certain tblproperties.""" + + if not isinstance(__value, TblPropertiesConfig): + return False + + def _without_ignore_list(d: Dict[str, str]) -> Dict[str, str]: + return {k: v for k, v in d.items() if k not in self.ignore_list} + + return _without_ignore_list(self.tblproperties) == _without_ignore_list( + __value.tblproperties + ) + + +class TblPropertiesProcessor(DatabricksComponentProcessor[TblPropertiesConfig]): + name: ClassVar[str] = "tblproperties" + + @classmethod + def from_results(cls, results: RelationResults) -> TblPropertiesConfig: + table = results.get("show_tblproperties") + tblproperties = dict() + + if table: + for row in table.rows: + tblproperties[str(row[0])] = str(row[1]) + + return TblPropertiesConfig(tblproperties=tblproperties) + + @classmethod + def from_model_node(cls, model_node: ModelNode) -> TblPropertiesConfig: + tblproperties = model_node.config.extra.get("tblproperties") + if not tblproperties: + return TblPropertiesConfig(tblproperties=dict()) + if isinstance(tblproperties, Dict): + tblproperties = {str(k): str(v) for k, v in tblproperties.items()} + return TblPropertiesConfig(tblproperties=tblproperties) + else: + raise DbtRuntimeError("tblproperties must be a dictionary") diff --git a/dbt/adapters/databricks/utils.py b/dbt/adapters/databricks/utils.py index 9fbbc411c..d2df7ee9e 100644 --- a/dbt/adapters/databricks/utils.py +++ b/dbt/adapters/databricks/utils.py @@ -5,6 +5,7 @@ from dbt.adapters.base import BaseAdapter from jinja2.runtime import Undefined +from agate import Table, Row A = TypeVar("A", bound=BaseAdapter) @@ -77,3 +78,9 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: def remove_ansi(line: str) -> str: ansi_escape = re.compile(r"(?:\x1B[@-_]|[\x80-\x9F])[0-?]*[ -/]*[@-~]") return ansi_escape.sub("", line) + + +def get_first_row(results: Table) -> Row: + if len(results.rows) == 0: + return Row(values=set()) + return results.rows[0] diff --git a/dbt/include/databricks/macros/adapters/metadata.sql b/dbt/include/databricks/macros/adapters/metadata.sql index 9f85ca828..343e98d35 100644 --- a/dbt/include/databricks/macros/adapters/metadata.sql +++ b/dbt/include/databricks/macros/adapters/metadata.sql @@ -69,4 +69,12 @@ {{ return(load_result('last_modified')) }} +{% endmacro %} + +{% macro get_view_description(relation) %} + {% call statement('get_view_description', fetch_result=True) -%} + select * from {{ relation.information_schema() }}.`views` where table_schema = '{{ relation.schema }}' and table_name = '{{ relation.identifier }}' + {% endcall %} + + {% do return(load_result('get_view_description').table) %} {% endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/materializations/materialized_view.sql b/dbt/include/databricks/macros/materializations/materialized_view.sql new file mode 100644 index 000000000..8fb55f306 --- /dev/null +++ b/dbt/include/databricks/macros/materializations/materialized_view.sql @@ -0,0 +1,98 @@ +{% materialization materialized_view, adapter = 'databricks' %} + {% set existing_relation = load_cached_relation(this) %} + {% set target_relation = this.incorporate(type=this.MaterializedView) %} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + {% set build_sql = materialized_view_get_build_sql(existing_relation, target_relation) %} + + {% if build_sql == '' %} + {{ materialized_view_execute_no_op(target_relation) }} + {% else %} + {{ materialized_view_execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) }} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} + + +{% macro materialized_view_get_build_sql(existing_relation, target_relation) %} + + {% set full_refresh_mode = should_full_refresh() %} + + -- determine the scenario we're in: create, full_refresh, alter, refresh data + {% if existing_relation is none %} + {% set build_sql = get_create_materialized_view_as_sql(target_relation, sql) %} + {% elif full_refresh_mode or not existing_relation.is_materialized_view %} + {% set build_sql = get_replace_sql(existing_relation, target_relation, sql) %} + {% else %} + + -- get config options + {% set on_configuration_change = config.get('on_configuration_change') %} + {% set configuration_changes = get_materialized_view_configuration_changes(existing_relation, config) %} + + {% if configuration_changes is none %} + {% set build_sql = refresh_materialized_view(target_relation) %} + + {% elif on_configuration_change == 'apply' %} + {% set build_sql = get_alter_materialized_view_as_sql(target_relation, configuration_changes, sql, existing_relation, None, None) %} + {% elif on_configuration_change == 'continue' %} + {% set build_sql = "" %} + {{ exceptions.warn("Configuration changes were identified and `on_configuration_change` was set to `continue` for `" ~ target_relation ~ "`") }} + {% elif on_configuration_change == 'fail' %} + {{ exceptions.raise_fail_fast_error("Configuration changes were identified and `on_configuration_change` was set to `fail` for `" ~ target_relation ~ "`") }} + + {% else %} + -- this only happens if the user provides a value other than `apply`, 'skip', 'fail' + {{ exceptions.raise_compiler_error("Unexpected configuration scenario") }} + + {% endif %} + + {% endif %} + + {% do return(build_sql) %} + +{% endmacro %} + + +{% macro materialized_view_execute_no_op(target_relation) %} + {% do store_raw_result( + name="main", + message="skip " ~ target_relation, + code="skip", + rows_affected="-1" + ) %} +{% endmacro %} + + +{% macro materialized_view_execute_build_sql(build_sql, existing_relation, target_relation, post_hooks) %} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% set grant_config = config.get('grants') %} + + {%- if build_sql is string %} + {% call statement(name="main") %} + {{ build_sql }} + {% endcall %} + {%- else %} + {%- for sql in build_sql %} + {% call statement(name="main") %} + {{ sql }} + {% endcall %} + {% endfor %} + {% endif %} + + + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/materializations/materialized_view/materialized_view.sql b/dbt/include/databricks/macros/materializations/materialized_view/materialized_view.sql deleted file mode 100644 index 6aef0539d..000000000 --- a/dbt/include/databricks/macros/materializations/materialized_view/materialized_view.sql +++ /dev/null @@ -1,43 +0,0 @@ -{% materialization materialized_view, adapter='databricks' %} - - {%- set identifier = model['alias'] -%} - - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - {%- set exists_as_materialized_view = (old_relation is not none and old_relation.is_materialized_view) -%} - - {%- set target_relation = api.Relation.create( - identifier=identifier, schema=schema, database=database, - type='materializedview') -%} - {% set grant_config = config.get('grants') %} - - {{ run_hooks(pre_hooks) }} - - {%- set full_refresh_mode = should_full_refresh() -%} - - {%- if exists_as_materialized_view and not full_refresh_mode -%} - -- refresh materialized view - {% call statement('main') -%} - {{ get_refresh_materialized_view_sql(target_relation) }} - {%- endcall %} - {%- else -%} - -- If there's a table with the same name and we weren't told to full refresh, - -- that's an error. If we were told to full refresh, drop it. This behavior differs - -- for Snowflake and BigQuery, so multiple dispatch is used. - {%- if old_relation is not none and (not exists_as_materialized_view or full_refresh_mode) -%} - {{ handle_existing_table(full_refresh_mode, old_relation) }} - {%- endif -%} - - -- build model - {% call statement('main') -%} - {{ get_create_materialized_view_as_sql(target_relation, sql) }} - {%- endcall %} - {%- endif -%} - - {% set should_revoke = should_revoke(exists_as_materialized_view, full_refresh_mode) %} - {% do apply_grants(target_relation, grant_config, should_revoke) %} - - {{ run_hooks(post_hooks) }} - - {{ return({'relations': [target_relation]}) }} - -{% endmaterialization %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/materializations/table.sql b/dbt/include/databricks/macros/materializations/table.sql index 2b9e90ecb..2e7e86323 100644 --- a/dbt/include/databricks/macros/materializations/table.sql +++ b/dbt/include/databricks/macros/materializations/table.sql @@ -14,7 +14,7 @@ -- setup: if the target relation already exists, drop it -- in case if the existing and future table is delta, we want to do a -- create or replace table instead of dropping, so we don't have the table unavailable - {% if old_relation and not (old_relation.is_delta and config.get('file_format', default='delta') == 'delta') -%} + {% if old_relation and (not (old_relation.is_delta and config.get('file_format', default='delta') == 'delta')) or (old_relation.is_materialized_view or old_relation.is_streaming_table) -%} {{ adapter.drop_relation(old_relation) }} {%- endif %} diff --git a/dbt/include/databricks/macros/relations/components/comment.sql b/dbt/include/databricks/macros/relations/components/comment.sql new file mode 100644 index 000000000..86f26a6b0 --- /dev/null +++ b/dbt/include/databricks/macros/relations/components/comment.sql @@ -0,0 +1,5 @@ +{%- macro get_create_sql_comment(comment) -%} +{% if comment is string -%} + COMMENT '{{ comment }}' +{%- endif -%} +{%- endmacro -%} \ No newline at end of file diff --git a/dbt/include/databricks/macros/relations/components/partitioning.sql b/dbt/include/databricks/macros/relations/components/partitioning.sql new file mode 100644 index 000000000..a5597f719 --- /dev/null +++ b/dbt/include/databricks/macros/relations/components/partitioning.sql @@ -0,0 +1,5 @@ +{% macro get_create_sql_partition_by(partition_by) -%} +{%- if partition_by -%} + PARTITIONED BY ({%- for col in partition_by -%}{{ col }}{% if not loop.last %}, {% endif %}{%- endfor %}) +{%- endif -%} +{%- endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/relations/components/refresh_schedule.sql b/dbt/include/databricks/macros/relations/components/refresh_schedule.sql new file mode 100644 index 000000000..618c4a3c3 --- /dev/null +++ b/dbt/include/databricks/macros/relations/components/refresh_schedule.sql @@ -0,0 +1,17 @@ +{% macro get_create_sql_refresh_schedule(cron, time_zone_value) %} + {%- if cron -%} + SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%} + {%- endif -%} +{% endmacro %} + +{% macro get_alter_sql_refresh_schedule(cron, time_zone_value, is_altered) %} + {%- if cron -%} + {%- if is_altered -%} + ALTER SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%} + {%- else -%} + ADD SCHEDULE CRON '{{ cron }}'{%- if time_zone_value %} AT TIME ZONE '{{ time_zone_value }}'{%- endif -%} + {%- endif -%} + {%- else -%} + DROP SCHEDULE + {%- endif -%} +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/relations/components/tblproperties.sql b/dbt/include/databricks/macros/relations/components/tblproperties.sql new file mode 100644 index 000000000..7e6c95faf --- /dev/null +++ b/dbt/include/databricks/macros/relations/components/tblproperties.sql @@ -0,0 +1,9 @@ +{% macro get_create_sql_tblproperties(tblproperties) %} + {%- if tblproperties and tblproperties|length>0 -%} + TBLPROPERTIES ( + {%- for prop in tblproperties -%} + '{{ prop }}' = '{{ tblproperties[prop] }}'{%- if not loop.last -%}, {% endif -%} + {% endfor -%} + ) + {%- endif -%} +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/relations/drop.sql b/dbt/include/databricks/macros/relations/drop.sql index d0e688d44..4fe95b114 100644 --- a/dbt/include/databricks/macros/relations/drop.sql +++ b/dbt/include/databricks/macros/relations/drop.sql @@ -1,3 +1,9 @@ +{% macro databricks__drop_relation(relation) -%} + {% call statement('drop_relation', auto_begin=False) -%} + {{ get_drop_sql(relation) }} + {%- endcall %} +{% endmacro %} + {% macro databricks__get_drop_sql(relation) -%} {%- if relation.is_materialized_view -%} {{ drop_materialized_view(relation) }} @@ -9,9 +15,3 @@ {{ drop_table(relation) }} {%- endif -%} {% endmacro %} - -{% macro databricks__drop_relation(relation) -%} - {% call statement('drop_relation', auto_begin=False) -%} - {{ get_drop_sql(relation) }} - {%- endcall %} -{% endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/relations/materialized_view/alter.sql b/dbt/include/databricks/macros/relations/materialized_view/alter.sql new file mode 100644 index 000000000..55a9599bf --- /dev/null +++ b/dbt/include/databricks/macros/relations/materialized_view/alter.sql @@ -0,0 +1,51 @@ +{% macro get_alter_materialized_view_as_sql( + relation, + configuration_changes, + sql, + existing_relation, + backup_relation, + intermediate_relation +) %} + {{- log('Applying ALTER to: ' ~ relation) -}} + {%- do return(adapter.dispatch('get_alter_materialized_view_as_sql', 'dbt')( + relation, + configuration_changes, + sql, + existing_relation, + backup_relation, + intermediate_relation + )) -%} +{% endmacro %} + + +{%- macro databricks__get_materialized_view_configuration_changes(existing_relation, new_config) -%} + {%- set _existing_materialized_view = adapter.get_relation_config(existing_relation) -%} + {%- set materialized_view = adapter.materialized_view_config_from_model(config.model) -%} + {%- set _configuration_changes = materialized_view.get_changeset(_existing_materialized_view) -%} + {% do return(_configuration_changes) %} +{%- endmacro -%} + +{% macro databricks__get_alter_materialized_view_as_sql( + relation, + configuration_changes, + sql, + existing_relation, + backup_relation, + intermediate_relation +) %} + -- apply a full refresh immediately if needed + {% if configuration_changes.requires_full_refresh %} + {% do return(get_replace_sql(existing_relation, relation, sql)) %} + + -- otherwise apply individual changes as needed + {% else %} + {% do return(get_alter_internal(relation, configuration_changes)) %} + {%- endif -%} +{% endmacro %} + +{% macro get_alter_internal(relation, configuration_changes) %} + {%- set refresh = configuration_changes.changes["refresh"] -%} + -- Currently only schedule can be altered + ALTER MATERIALIZED VIEW {{ relation }} + {{ get_alter_sql_refresh_schedule(refresh.cron, refresh.time_zone_value, refresh.is_altered) -}} +{% endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/relations/materialized_view/create.sql b/dbt/include/databricks/macros/relations/materialized_view/create.sql index 80043a284..5c18911ec 100644 --- a/dbt/include/databricks/macros/relations/materialized_view/create.sql +++ b/dbt/include/databricks/macros/relations/materialized_view/create.sql @@ -1,5 +1,14 @@ {% macro databricks__get_create_materialized_view_as_sql(relation, sql) -%} + {%- set materialized_view = adapter.materialized_view_config_from_model(config.model) -%} + {%- set partition_by = materialized_view.config["partition_by"].partition_by -%} + {%- set tblproperties = materialized_view.config["tblproperties"].tblproperties -%} + {%- set comment = materialized_view.config["comment"].comment -%} + {%- set refresh = materialized_view.config["refresh"] -%} create materialized view {{ relation }} + {{ get_create_sql_partition_by(partition_by) }} + {{ get_create_sql_comment(comment) }} + {{ get_create_sql_tblproperties(tblproperties) }} + {{ get_create_sql_refresh_schedule(refresh.cron, refresh.time_zone_value) }} as {{ sql }} {% endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/relations/materialized_view/refresh.sql b/dbt/include/databricks/macros/relations/materialized_view/refresh.sql index 47a99d243..651b3082d 100644 --- a/dbt/include/databricks/macros/relations/materialized_view/refresh.sql +++ b/dbt/include/databricks/macros/relations/materialized_view/refresh.sql @@ -1,7 +1,3 @@ -{% macro get_refresh_materialized_view_sql(relation) -%} - {{ adapter.dispatch('get_refresh_materialized_view_sql', 'dbt')(relation) }} -{%- endmacro %} - -{% macro databricks__get_refresh_materialized_view_sql(relation) -%} +{% macro databricks__refresh_materialized_view(relation) -%} refresh materialized view {{ relation }} {% endmacro %} \ No newline at end of file diff --git a/dbt/include/databricks/macros/relations/replace.sql b/dbt/include/databricks/macros/relations/replace.sql new file mode 100644 index 000000000..91dd99373 --- /dev/null +++ b/dbt/include/databricks/macros/relations/replace.sql @@ -0,0 +1,55 @@ +{% macro get_replace_sql(existing_relation, target_relation, sql) %} + {{- log('Applying REPLACE to: ' ~ existing_relation) -}} + {% do return(adapter.dispatch('get_replace_sql', 'dbt')(existing_relation, target_relation, sql)) %} +{% endmacro %} + +{% macro databricks__get_replace_sql(existing_relation, target_relation, sql) %} + {# /* use a create or replace statement if possible */ #} + + {% set is_replaceable = existing_relation.type == target_relation_type and existing_relation.can_be_replaced %} + + {% if is_replaceable and existing_relation.is_view %} + {{ get_replace_view_sql(target_relation, sql) }} + + {% elif is_replaceable and existing_relation.is_table %} + {{ get_replace_table_sql(target_relation, sql) }} + + {% elif is_replaceable and existing_relation.is_materialized_view %} + {{ get_replace_materialized_view_sql(target_relation, sql) }} + + {# /* a create or replace statement is not possible, so try to stage and/or backup to be safe */ #} + + {# /* create target_relation as an intermediate relation, then swap it out with the existing one using a backup */ #} + {%- elif target_relation.can_be_renamed and existing_relation.can_be_renamed -%} + {{ return([ + get_create_intermediate_sql(target_relation, sql), + get_create_backup_sql(existing_relation), + get_rename_intermediate_sql(target_relation), + get_drop_backup_sql(existing_relation) + ]) }} + + {# /* create target_relation as an intermediate relation, then swap it out with the existing one without using a backup */ #} + {%- elif target_relation.can_be_renamed -%} + {{ return([ + get_create_intermediate_sql(target_relation, sql), + get_drop_sql(existing_relation), + get_rename_intermediate_sql(target_relation), + ]) }} + + {# /* create target_relation in place by first backing up the existing relation */ #} + {%- elif existing_relation.can_be_renamed -%} + {{ return([ + get_create_backup_sql(existing_relation), + get_create_sql(target_relation, sql), + get_drop_backup_sql(existing_relation) + ]) }} + + {# /* no renaming is allowed, so just drop and create */ #} + {%- else -%} + {{ return([ + get_drop_sql(existing_relation), + get_create_sql(target_relation, sql) + ]) }} + {%- endif -%} + +{% endmacro %} \ No newline at end of file diff --git a/dev-requirements.txt b/dev-requirements.txt index 22b46a03b..87c22daa3 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -23,8 +23,8 @@ tox>=3.2.0 types-requests types-mock -dbt-core==1.7.1 -dbt-tests-adapter==1.7.1 +dbt-core==1.7.3 +dbt-tests-adapter==1.7.3 # git+https://github.com/dbt-labs/dbt-spark.git@1.5.latest#egg=dbt-spark # git+https://github.com/dbt-labs/dbt-core.git@1.5.latest#egg=dbt-core&subdirectory=core # git+https://github.com/dbt-labs/dbt-core.git@1.5.latest#egg=dbt-tests-adapter&subdirectory=tests/adapter diff --git a/tests/functional/adapter/materialized_view_tests/fixtures.py b/tests/functional/adapter/materialized_view_tests/fixtures.py new file mode 100644 index 000000000..558e9075d --- /dev/null +++ b/tests/functional/adapter/materialized_view_tests/fixtures.py @@ -0,0 +1,43 @@ +from typing import Optional +from dbt.adapters.base import BaseRelation + +from dbt.adapters.databricks.relation import DatabricksRelationType + + +def query_relation_type(project, relation: BaseRelation) -> Optional[str]: + table_type = project.run_sql( + f"select table_type from {relation.information_schema_only()}." + f"`tables` where table_name = '{relation.identifier}'", + fetch="one", + )[0] + if table_type == "STREAMING_TABLE": + return DatabricksRelationType.StreamingTable.value + elif table_type == "MANAGED" or table_type == "EXTERNAL": + return DatabricksRelationType.Table.value + else: + is_materialized = project.run_sql( + f"select is_materialized from {relation.information_schema_only()}." + f"`views` where table_name = '{relation.identifier}'", + fetch="one", + )[0] + if is_materialized == "TRUE": + return DatabricksRelationType.MaterializedView.value + else: + return DatabricksRelationType.View.value + + +materialized_view = """ +{{ config( + materialized='materialized_view', + description='this is a materialized view', + partition_by='id', + schedule = { + 'cron': '0 0 * * * ? *', + 'time_zone': 'Etc/UTC' + }, + tblproperties={ + 'key': 'value' + }, +) }} +select * from {{ ref('my_seed') }} +""" diff --git a/tests/functional/adapter/materialized_view_tests/test_basic.py b/tests/functional/adapter/materialized_view_tests/test_basic.py new file mode 100644 index 000000000..210fb5c9b --- /dev/null +++ b/tests/functional/adapter/materialized_view_tests/test_basic.py @@ -0,0 +1,46 @@ +from typing import Optional, Tuple +from dbt.tests.adapter.materialized_view.basic import MaterializedViewBasic +from dbt.adapters.base.relation import BaseRelation +from dbt.tests import util +import pytest + +from tests.functional.adapter.materialized_view_tests import fixtures + + +class TestMaterializedViewsMixin: + @staticmethod + def insert_record(project, table: BaseRelation, record: Tuple[int, int]) -> None: + project.run_sql(f"insert into {table} values {record}") + + @staticmethod + def refresh_materialized_view(project, materialized_view: BaseRelation) -> None: + util.run_dbt(["run", "--models", str(materialized_view.identifier)]) + + @staticmethod + def query_row_count(project, relation: BaseRelation) -> int: + return project.run_sql(f"select count(*) from {relation}", fetch="one")[0] + + @staticmethod + def query_relation_type(project, relation: BaseRelation) -> Optional[str]: + return fixtures.query_relation_type(project, relation) + + +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViews(TestMaterializedViewsMixin, MaterializedViewBasic): + def test_table_replaces_materialized_view(self, project, my_materialized_view): + util.run_dbt(["run", "--models", my_materialized_view.identifier]) + assert self.query_relation_type(project, my_materialized_view) == "materialized_view" + + self.swap_materialized_view_to_table(project, my_materialized_view) + + util.run_dbt(["run", "--models", my_materialized_view.identifier]) + # assert self.query_relation_type(project, my_materialized_view) == "table" + + def test_view_replaces_materialized_view(self, project, my_materialized_view): + util.run_dbt(["run", "--models", my_materialized_view.identifier]) + assert self.query_relation_type(project, my_materialized_view) == "materialized_view" + + self.swap_materialized_view_to_view(project, my_materialized_view) + + util.run_dbt(["run", "--models", my_materialized_view.identifier]) + # assert self.query_relation_type(project, my_materialized_view) == "view" diff --git a/tests/functional/adapter/materialized_view_tests/test_changes.py b/tests/functional/adapter/materialized_view_tests/test_changes.py new file mode 100644 index 000000000..4243b4273 --- /dev/null +++ b/tests/functional/adapter/materialized_view_tests/test_changes.py @@ -0,0 +1,96 @@ +from typing import Optional +from dbt.tests.adapter.materialized_view.changes import ( + MaterializedViewChanges, + MaterializedViewChangesApplyMixin, + MaterializedViewChangesContinueMixin, + MaterializedViewChangesFailMixin, +) +from dbt.adapters.base import BaseRelation +from dbt.tests import util +import pytest +from dbt.adapters.databricks.relation_configs.materialized_view import MaterializedViewConfig +from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig + +from tests.functional.adapter.materialized_view_tests import fixtures + + +def _check_tblproperties(tblproperties: TblPropertiesConfig, expected: dict): + final_tblproperties = { + k: v for k, v in tblproperties.tblproperties.items() if not k.startswith("pipeline") + } + assert final_tblproperties == expected + + +class MaterializedViewChangesMixin(MaterializedViewChanges): + @pytest.fixture(scope="class", autouse=True) + def models(self): + return {"my_materialized_view.sql": fixtures.materialized_view} + + @staticmethod + def check_start_state(project, materialized_view): + with util.get_connection(project.adapter): + results = project.adapter.get_relation_config(materialized_view) + assert isinstance(results, MaterializedViewConfig) + assert results.config["partition_by"].partition_by == ["id"] + assert results.config["query"].query.startswith("select * from") + _check_tblproperties(results.config["tblproperties"], {"key": "value"}) + assert results.config["refresh"].cron == "0 0 * * * ? *" + assert results.config["refresh"].time_zone_value == "Etc/UTC" + + @staticmethod + def change_config_via_alter(project, materialized_view): + initial_model = util.get_model_file(project, materialized_view) + new_model = initial_model.replace("'cron': '0 0 * * * ? *'", "'cron': '0 5 * * * ? *'") + util.set_model_file(project, materialized_view, new_model) + + @staticmethod + def check_state_alter_change_is_applied(project, materialized_view): + with util.get_connection(project.adapter): + results = project.adapter.get_relation_config(materialized_view) + assert isinstance(results, MaterializedViewConfig) + assert results.config["refresh"].cron == "0 5 * * * ? *" + assert results.config["refresh"].time_zone_value == "Etc/UTC" + + @staticmethod + def change_config_via_replace(project, materialized_view): + initial_model = util.get_model_file(project, materialized_view) + new_model = ( + initial_model.replace("partition_by='id',", "") + .replace("select *", "select id, value") + .replace("'key': 'value'", "'other': 'other'") + ) + util.set_model_file(project, materialized_view, new_model) + + @staticmethod + def check_state_replace_change_is_applied(project, materialized_view): + with util.get_connection(project.adapter): + results = project.adapter.get_relation_config(materialized_view) + assert isinstance(results, MaterializedViewConfig) + assert results.config["partition_by"].partition_by == [] + assert results.config["query"].query.startswith("select id, value") + _check_tblproperties(results.config["tblproperties"], {"other": "other"}) + + @staticmethod + def query_relation_type(project, relation: BaseRelation) -> Optional[str]: + return fixtures.query_relation_type(project, relation) + + +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewApplyChanges( + MaterializedViewChangesMixin, MaterializedViewChangesApplyMixin +): + pass + + +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewContinueOnChanges( + MaterializedViewChangesMixin, MaterializedViewChangesContinueMixin +): + pass + + +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster") +class TestMaterializedViewFailOnChanges( + MaterializedViewChangesMixin, MaterializedViewChangesFailMixin +): + pass diff --git a/tests/unit/macros/relation_configs/base.py b/tests/unit/macros/relation_configs/base.py new file mode 100644 index 000000000..63b7111f4 --- /dev/null +++ b/tests/unit/macros/relation_configs/base.py @@ -0,0 +1,32 @@ +from typing import Any +from jinja2 import Environment, FileSystemLoader +import pytest + + +class RelationConfigTestBase: + @pytest.fixture(scope="class") + def env(self) -> Environment: + """ + The environment used for rendering Databricks macros + """ + return Environment( + loader=FileSystemLoader("dbt/include/databricks/macros/relations/components"), + extensions=["jinja2.ext.do"], + ) + + @pytest.fixture(scope="class") + def template_name(self) -> str: + """ + The name of the Databricks template you want to test, not including the path. + + Example: "adapters.sql" + """ + raise NotImplementedError("Must be implemented by subclasses") + + @pytest.fixture + def template(self, template_name, env) -> Any: + """ + This creates the template you will test against. + You generally don't want to override this. + """ + return env.get_template(template_name).module diff --git a/tests/unit/macros/relation_configs/test_comment_macros.py b/tests/unit/macros/relation_configs/test_comment_macros.py new file mode 100644 index 000000000..119012460 --- /dev/null +++ b/tests/unit/macros/relation_configs/test_comment_macros.py @@ -0,0 +1,20 @@ +import pytest +from tests.unit.macros.relation_configs.base import RelationConfigTestBase + + +class TestCommentMacros(RelationConfigTestBase): + @pytest.fixture(scope="class") + def template_name(self) -> str: + return "comment.sql" + + def test_get_create_sql_comment__with_no_comment(self, template): + s = template.get_create_sql_comment(None) + assert s == "" + + def test_get_create_sql_comment__with_empty_comment(self, template): + s = template.get_create_sql_comment("") + assert s == "COMMENT ''" + + def test_get_create_sql_comment__with_comment(self, template): + s = template.get_create_sql_comment("test_comment") + assert s == "COMMENT 'test_comment'" diff --git a/tests/unit/macros/relation_configs/test_partitioning_macros.py b/tests/unit/macros/relation_configs/test_partitioning_macros.py new file mode 100644 index 000000000..b502f58c6 --- /dev/null +++ b/tests/unit/macros/relation_configs/test_partitioning_macros.py @@ -0,0 +1,24 @@ +import pytest +from tests.unit.macros.relation_configs.base import RelationConfigTestBase + + +class TestPartitioningMacros(RelationConfigTestBase): + @pytest.fixture(scope="class") + def template_name(self) -> str: + return "partitioning.sql" + + def test_get_create_sql_partition_by__empty(self, template): + s = template.get_create_sql_partition_by([]) + assert s == "" + + def test_get_create_sql_partition_by__none(self, template): + s = template.get_create_sql_partition_by(None) + assert s == "" + + def test_get_create_sql_partition_by__single(self, template): + s = template.get_create_sql_partition_by(["id"]) + assert s == "PARTITIONED BY (id)" + + def test_get_create_sql_partition_by__multiple(self, template): + s = template.get_create_sql_partition_by(["id", "value"]) + assert s == "PARTITIONED BY (id, value)" diff --git a/tests/unit/macros/relation_configs/test_refresh_schedule_macros.py b/tests/unit/macros/relation_configs/test_refresh_schedule_macros.py new file mode 100644 index 000000000..cf11d3ad1 --- /dev/null +++ b/tests/unit/macros/relation_configs/test_refresh_schedule_macros.py @@ -0,0 +1,40 @@ +import pytest +from tests.unit.macros.relation_configs.base import RelationConfigTestBase + + +class TestRefreshScheduleMacros(RelationConfigTestBase): + @pytest.fixture(scope="class") + def template_name(self) -> str: + return "refresh_schedule.sql" + + def test_get_create_sql_refresh_schedule__manual(self, template): + s = template.get_create_sql_refresh_schedule(None, None) + assert s == "" + + def test_get_create_sql_refresh_schedule__cron_only(self, template): + s = template.get_create_sql_refresh_schedule("*/5 * * * * ?", None) + assert s == "SCHEDULE CRON '*/5 * * * * ?'" + + def test_get_create_sql_refresh_schedule__with_time_zone(self, template): + s = template.get_create_sql_refresh_schedule("*/5 * * * * ?", "UTC") + assert s == "SCHEDULE CRON '*/5 * * * * ?' AT TIME ZONE 'UTC'" + + def test_get_alter_sql_refresh_schedule__manual(self, template): + s = template.get_alter_sql_refresh_schedule(None, None, False) + assert s == "DROP SCHEDULE" + + def test_get_alter_sql_refresh_schedule__add_cron_only(self, template): + s = template.get_alter_sql_refresh_schedule("*/5 * * * * ?", None, False) + assert s == "ADD SCHEDULE CRON '*/5 * * * * ?'" + + def test_get_alter_sql_refresh_schedule__add_with_time_zone(self, template): + s = template.get_alter_sql_refresh_schedule("*/5 * * * * ?", "UTC", False) + assert s == "ADD SCHEDULE CRON '*/5 * * * * ?' AT TIME ZONE 'UTC'" + + def test_get_alter_sql_refresh_schedule__alter_cron_only(self, template): + s = template.get_alter_sql_refresh_schedule("*/5 * * * * ?", None, True) + assert s == "ALTER SCHEDULE CRON '*/5 * * * * ?'" + + def test_get_alter_sql_refresh_schedule__alter_with_time_zone(self, template): + s = template.get_alter_sql_refresh_schedule("*/5 * * * * ?", "UTC", True) + assert s == "ALTER SCHEDULE CRON '*/5 * * * * ?' AT TIME ZONE 'UTC'" diff --git a/tests/unit/macros/relation_configs/test_tblproperties_macros.py b/tests/unit/macros/relation_configs/test_tblproperties_macros.py new file mode 100644 index 000000000..60ff71f0d --- /dev/null +++ b/tests/unit/macros/relation_configs/test_tblproperties_macros.py @@ -0,0 +1,24 @@ +import pytest +from tests.unit.macros.relation_configs.base import RelationConfigTestBase + + +class TestTblPropertiesMacros(RelationConfigTestBase): + @pytest.fixture(scope="class") + def template_name(self) -> str: + return "tblproperties.sql" + + def test_get_create_sql_tblproperties__empty(self, template): + s = template.get_create_sql_tblproperties({}) + assert s == "" + + def test_get_create_sql_tblproperties__none(self, template): + s = template.get_create_sql_tblproperties(None) + assert s == "" + + def test_get_create_sql_tblproperties__single(self, template): + s = template.get_create_sql_tblproperties({"key": "value"}) + assert s == "TBLPROPERTIES ('key' = 'value')" + + def test_get_create_sql_tblproperties__multiple(self, template): + s = template.get_create_sql_tblproperties({"key": "value", "other": "other_value"}) + assert s == "TBLPROPERTIES ('key' = 'value', 'other' = 'other_value')" diff --git a/tests/unit/relation_configs/test_comment.py b/tests/unit/relation_configs/test_comment.py new file mode 100644 index 000000000..6329641a8 --- /dev/null +++ b/tests/unit/relation_configs/test_comment.py @@ -0,0 +1,58 @@ +from mock import Mock +from agate import Table +from dbt.adapters.databricks.relation_configs.comment import CommentConfig, CommentProcessor + + +class TestCommentProcessor: + def test_from_results__no_comment(self): + results = { + "describe_extended": Table( + rows=[ + ["col_name", "data_type", "comment"], + ["col_a", "int", "This is a comment"], + [None, None, None], + ["# Detailed Table Information", None, None], + ["Catalog:", "default", None], + ["Schema:", "default", None], + ["Table:", "table_abc", None], + ] + ) + } + config = CommentProcessor.from_results(results) + assert config == CommentConfig() + + def test_from_results__with_comment(self): + results = { + "describe_extended": Table( + rows=[ + ["col_name", "data_type", "comment"], + ["col_a", "int", "This is a comment"], + [None, None, None], + ["# Detailed Table Information", None, None], + ["Catalog:", "default", None], + ["Schema:", "default", None], + ["Table:", "table_abc", None], + ["Comment", "This is the table comment", None], + ] + ) + } + config = CommentProcessor.from_results(results) + assert config == CommentConfig(comment="This is the table comment") + + def test_from_model_node__no_comment(self): + model_node = Mock() + model_node.description = None + config = CommentProcessor.from_model_node(model_node) + assert config == CommentConfig() + + def test_from_model_node__empty_comment(self): + model_node = Mock() + model_node.description = "" + config = CommentProcessor.from_model_node(model_node) + assert config == CommentConfig(comment="") + + def test_from_model_node__comment(self): + model_node = Mock() + model_node.description = "a comment" + config = CommentProcessor.from_model_node(model_node) + assert config == CommentConfig(comment="a comment") diff --git a/tests/unit/relation_configs/test_config_base.py b/tests/unit/relation_configs/test_config_base.py new file mode 100644 index 000000000..2f99307e7 --- /dev/null +++ b/tests/unit/relation_configs/test_config_base.py @@ -0,0 +1,47 @@ +from dbt.adapters.databricks.relation_configs.base import ( + DatabricksComponentConfig, + DatabricksRelationChangeSet, +) + +from dbt.adapters.databricks.relation_configs.comment import CommentConfig +from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig + + +class MockComponentConfig(DatabricksComponentConfig): + data: int = 1 + + @property + def requires_full_refresh(self) -> bool: + return True + + +class TestDatabricksComponentConfig: + def test_get_diff__valid_type(self): + config = MockComponentConfig() + other = MockComponentConfig(data=2) + + assert config.get_diff(other) == config + + +class TestDatabricksRelationChangeSet: + def test_requires_full_refresh__no_changes(self): + changeset = DatabricksRelationChangeSet(changes={}) + assert not changeset.requires_full_refresh + + def test_requires_full_refresh__has_only_alterable_changes(self): + changeset = DatabricksRelationChangeSet(changes={"refresh": RefreshConfig()}) + assert not changeset.requires_full_refresh + + def test_requires_full_refresh__has_an_inalterable_change(self): + changeset = DatabricksRelationChangeSet( + changes={"comment": CommentConfig(), "refresh": RefreshConfig()} + ) + assert changeset.requires_full_refresh + + def test_has_changes__no_changes(self): + changeset = DatabricksRelationChangeSet(changes={}) + assert not changeset.has_changes + + def test_has_changes__has_changes(self): + changeset = DatabricksRelationChangeSet(changes={"refresh": RefreshConfig()}) + assert changeset.has_changes diff --git a/tests/unit/relation_configs/test_materialized_view_config.py b/tests/unit/relation_configs/test_materialized_view_config.py new file mode 100644 index 000000000..4d10a72a7 --- /dev/null +++ b/tests/unit/relation_configs/test_materialized_view_config.py @@ -0,0 +1,119 @@ +from agate import Table, Row +from mock import Mock +from dbt.adapters.databricks.relation_configs.comment import CommentConfig +from dbt.adapters.databricks.relation_configs.materialized_view import MaterializedViewConfig +from dbt.adapters.databricks.relation_configs.partitioning import PartitionedByConfig +from dbt.adapters.databricks.relation_configs.query import QueryConfig +from dbt.adapters.databricks.relation_configs.refresh import RefreshConfig +from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig + + +class TestMaterializedViewConfig: + def test_from_results(self): + results = { + "describe_extended": Table( + rows=[ + ["col_name", "data_type", "comment"], + ["col_a", "int", "This is a comment"], + ["# Partition Information", None, None], + ["# col_name", "data_type", "comment"], + ["col_a", "int", "This is a comment"], + ["col_b", "int", "This is a comment"], + [None, None, None], + ["# Detailed Table Information", None, None], + ["Catalog:", "default", None], + ["Comment", "This is the table comment", None], + ["Refresh Schedule", "MANUAL", None], + ] + ), + "information_schema.views": Row( + ["select * from foo", "other"], ["view_definition", "comment"] + ), + "show_tblproperties": Table(rows=[["prop", "1"], ["other", "other"]]), + } + + config = MaterializedViewConfig.from_results(results) + + assert config == MaterializedViewConfig( + config={ + "partition_by": PartitionedByConfig(partition_by=["col_a", "col_b"]), + "comment": CommentConfig(comment="This is the table comment"), + "tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}), + "refresh": RefreshConfig(), + "query": QueryConfig(query="select * from foo"), + } + ) + + def test_from_model_node(self): + model = Mock() + model.compiled_code = "select * from foo" + model.config.extra = { + "partition_by": ["col_a", "col_b"], + "tblproperties": { + "prop": "1", + "other": "other", + }, + } + model.description = "This is the table comment" + + config = MaterializedViewConfig.from_model_node(model) + + assert config == MaterializedViewConfig( + config={ + "partition_by": PartitionedByConfig(partition_by=["col_a", "col_b"]), + "comment": CommentConfig(comment="This is the table comment"), + "tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}), + "refresh": RefreshConfig(), + "query": QueryConfig(query="select * from foo"), + } + ) + + def test_get_changeset__no_changes(self): + old = MaterializedViewConfig( + config={ + "partition_by": PartitionedByConfig(partition_by=["col_a", "col_b"]), + "comment": CommentConfig(comment="This is the table comment"), + "tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}), + "refresh": RefreshConfig(), + "query": QueryConfig(query="select * from foo"), + } + ) + new = MaterializedViewConfig( + config={ + "partition_by": PartitionedByConfig(partition_by=["col_a", "col_b"]), + "comment": CommentConfig(comment="This is the table comment"), + "tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}), + "refresh": RefreshConfig(), + "query": QueryConfig(query="select * from foo"), + } + ) + + assert new.get_changeset(old) is None + + def test_get_changeset__some_changes(self): + old = MaterializedViewConfig( + config={ + "partition_by": PartitionedByConfig(partition_by=["col_a", "col_b"]), + "comment": CommentConfig(comment="This is the table comment"), + "tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}), + "refresh": RefreshConfig(), + "query": QueryConfig(query="select * from foo"), + } + ) + new = MaterializedViewConfig( + config={ + "partition_by": PartitionedByConfig(partition_by=["col_a"]), + "comment": CommentConfig(comment="This is the table comment"), + "tblproperties": TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}), + "refresh": RefreshConfig(cron="*/5 * * * *"), + "query": QueryConfig(query="select * from foo"), + } + ) + + changeset = new.get_changeset(old) + assert changeset.has_changes + assert changeset.requires_full_refresh + assert changeset.changes == { + "partition_by": PartitionedByConfig(partition_by=["col_a"]), + "refresh": RefreshConfig(cron="*/5 * * * *"), + } diff --git a/tests/unit/relation_configs/test_partitioning.py b/tests/unit/relation_configs/test_partitioning.py new file mode 100644 index 000000000..9fb01f126 --- /dev/null +++ b/tests/unit/relation_configs/test_partitioning.py @@ -0,0 +1,81 @@ +from mock import Mock +from agate import Table + +from dbt.adapters.databricks.relation_configs.partitioning import ( + PartitionedByConfig, + PartitionedByProcessor, +) + + +class TestPartitionedByProcessor: + def test_from_results__none(self): + results = { + "describe_extended": Table( + rows=[ + ["col_name", "data_type", "comment"], + ["col_a", "int", "This is a comment"], + [None, None, None], + ["# Detailed Table Information", None, None], + ["Catalog:", "default", None], + ] + ) + } + + spec = PartitionedByProcessor.from_results(results) + assert spec == PartitionedByConfig(partition_by=[]) + + def test_from_results__single(self): + results = { + "describe_extended": Table( + rows=[ + ["col_name", "data_type", "comment"], + ["col_a", "int", "This is a comment"], + ["# Partition Information", None, None], + ["# col_name", "data_type", "comment"], + ["col_a", "int", "This is a comment"], + [None, None, None], + ["# Detailed Table Information", None, None], + ["Catalog:", "default", None], + ] + ) + } + + spec = PartitionedByProcessor.from_results(results) + assert spec == PartitionedByConfig(partition_by=["col_a"]) + + def test_from_results__multiple(self): + results = { + "describe_extended": Table( + rows=[ + ["col_name", "data_type", "comment"], + ["col_a", "int", "This is a comment"], + ["# Partition Information", None, None], + ["# col_name", "data_type", "comment"], + ["col_a", "int", "This is a comment"], + ["col_b", "int", "This is a comment"], + [None, None, None], + ["# Detailed Table Information", None, None], + ["Catalog:", "default", None], + ] + ) + } + spec = PartitionedByProcessor.from_results(results) + assert spec == PartitionedByConfig(partition_by=["col_a", "col_b"]) + + def test_from_model_node__without_partition_by(self): + model = Mock() + model.config.extra = {} + spec = PartitionedByProcessor.from_model_node(model) + assert spec == PartitionedByConfig(partition_by=[]) + + def test_from_model_node__single_column(self): + model = Mock() + model.config.extra = {"partition_by": "col_a"} + spec = PartitionedByProcessor.from_model_node(model) + assert spec == PartitionedByConfig(partition_by=["col_a"]) + + def test_from_model_node__multiple_columns(self): + model = Mock() + model.config.extra = {"partition_by": ["col_a", "col_b"]} + spec = PartitionedByProcessor.from_model_node(model) + assert spec == PartitionedByConfig(partition_by=["col_a", "col_b"]) diff --git a/tests/unit/relation_configs/test_query.py b/tests/unit/relation_configs/test_query.py new file mode 100644 index 000000000..c3d9c412b --- /dev/null +++ b/tests/unit/relation_configs/test_query.py @@ -0,0 +1,30 @@ +from agate import Row +from mock import Mock +import pytest +from dbt.exceptions import DbtRuntimeError +from dbt.adapters.databricks.relation_configs.query import QueryConfig, QueryProcessor + +sql = "select * from foo" + + +class TestQueryProcessor: + def test_from_results(self): + results = {"information_schema.views": Row([sql, "other"], ["view_definition", "comment"])} + spec = QueryProcessor.from_results(results) + assert spec == QueryConfig(query=sql) + + def test_from_model_node__with_query(self): + model = Mock() + model.compiled_code = sql + spec = QueryProcessor.from_model_node(model) + assert spec == QueryConfig(query=sql) + + def test_from_model_node__without_query(self): + model = Mock() + model.compiled_code = None + model.unique_id = "1" + with pytest.raises( + DbtRuntimeError, + match="Cannot compile model 1 with no SQL query", + ): + _ = QueryProcessor.from_model_node(model) diff --git a/tests/unit/relation_configs/test_refresh.py b/tests/unit/relation_configs/test_refresh.py new file mode 100644 index 000000000..5dddb3a7d --- /dev/null +++ b/tests/unit/relation_configs/test_refresh.py @@ -0,0 +1,100 @@ +from typing import Any, List +from mock import Mock +import pytest +from dbt.adapters.databricks.relation_configs.refresh import ( + RefreshProcessor, + RefreshConfig, +) +from dbt.exceptions import DbtRuntimeError +from agate import Table + + +class TestRefreshProcessor: + @pytest.fixture + def rows(self) -> List[List[Any]]: + return [ + ["col_name", "data_type", "comment"], + ["col_a", "int", "This is a comment"], + [None, None, None], + ["# Detailed Table Information", None, None], + ["Catalog:", "default", None], + ["Schema:", "default", None], + ["Table:", "table_abc", None], + ] + + def test_from_results__valid_schedule(self, rows): + results = { + "describe_extended": Table( + rows=rows + [["Refresh Schedule", "CRON '*/5 * * * *' AT TIME ZONE 'UTC'"]] + ) + } + spec = RefreshProcessor.from_results(results) + assert spec == RefreshConfig(cron="*/5 * * * *", time_zone_value="UTC") + + def test_from_results__manual(self, rows): + results = {"describe_extended": Table(rows=rows + [["Refresh Schedule", "MANUAL"]])} + spec = RefreshProcessor.from_results(results) + assert spec == RefreshConfig() + + def test_from_results__invalid(self, rows): + results = { + "describe_extended": Table(rows=rows + [["Refresh Schedule", "invalid description"]]) + } + with pytest.raises( + DbtRuntimeError, + match="Could not parse schedule from description: invalid description", + ): + RefreshProcessor.from_results(results) + + def test_from_model_node__without_schedule(self): + model = Mock() + model.config.extra = {} + spec = RefreshProcessor.from_model_node(model) + assert spec == RefreshConfig() + + def test_from_model_node__without_cron(self): + model = Mock() + model.config.extra = {"schedule": {"time_zone_value": "UTC"}} + with pytest.raises( + DbtRuntimeError, + match="Schedule config must contain a 'cron' key, got {'time_zone_value': 'UTC'}", + ): + RefreshProcessor.from_model_node(model) + + def test_from_model_node__without_timezone(self): + model = Mock() + model.config.extra = {"schedule": {"cron": "*/5 * * * *"}} + spec = RefreshProcessor.from_model_node(model) + assert spec == RefreshConfig(cron="*/5 * * * *", time_zone_value=None) + + def test_process_model_node__both(self): + model = Mock() + model.config.extra = {"schedule": {"cron": "*/5 * * * *", "time_zone_value": "UTC"}} + spec = RefreshProcessor.from_model_node(model) + assert spec == RefreshConfig(cron="*/5 * * * *", time_zone_value="UTC") + + +class TestRefreshConfig: + def test_get_diff__scheduled_other_manual_refresh(self): + config = RefreshConfig(cron="*/5 * * * *") + other = RefreshConfig() + diff = config.get_diff(other) + assert diff == RefreshConfig(cron="*/5 * * * *", is_altered=False) + + def test_get_diff__scheduled_other_scheduled_refresh(self): + config = RefreshConfig(cron="*/5 * * * *") + other = RefreshConfig(cron="0 * * * *") + diff = config.get_diff(other) + assert diff == RefreshConfig(cron="*/5 * * * *", is_altered=True) + + def test_get_diff__manual_other_scheduled_refresh(self): + config = RefreshConfig() + other = RefreshConfig(cron="*/5 * * * *") + diff = config.get_diff(other) + assert diff == config + + def test_get_diff__manual_other_manual_refresh(self): + config = RefreshConfig() + other = RefreshConfig() + diff = config.get_diff(other) + assert diff is None diff --git a/tests/unit/relation_configs/test_tblproperties.py b/tests/unit/relation_configs/test_tblproperties.py new file mode 100644 index 000000000..a1929bba9 --- /dev/null +++ b/tests/unit/relation_configs/test_tblproperties.py @@ -0,0 +1,55 @@ +from mock import Mock +import pytest +from agate import Table + +from dbt.adapters.databricks.relation_configs.tblproperties import ( + TblPropertiesConfig, + TblPropertiesProcessor, +) +from dbt.exceptions import DbtRuntimeError + + +class TestTblPropertiesProcessor: + def test_from_results__none(self): + results = {"show_tblproperties": None} + spec = TblPropertiesProcessor.from_results(results) + assert spec == TblPropertiesConfig(tblproperties={}) + + def test_from_results__single(self): + results = {"show_tblproperties": Table(rows=[["prop", "f1"]])} + spec = TblPropertiesProcessor.from_results(results) + assert spec == TblPropertiesConfig(tblproperties={"prop": "f1"}) + + def test_from_results__multiple(self): + results = {"show_tblproperties": Table(rows=[["prop", "1"], ["other", "other"]])} + spec = TblPropertiesProcessor.from_results(results) + assert spec == TblPropertiesConfig(tblproperties={"prop": "1", "other": "other"}) + + def test_from_model_node__without_tblproperties(self): + model = Mock() + model.config.extra = {} + spec = TblPropertiesProcessor.from_model_node(model) + assert spec == TblPropertiesConfig(tblproperties={}) + + def test_from_model_node__with_tblpropoerties(self): + model = Mock() + model.config.extra = { + "tblproperties": {"prop": 1}, + } + spec = TblPropertiesProcessor.from_model_node(model) + assert spec == TblPropertiesConfig(tblproperties={"prop": "1"}) + + def test_from_model_node__with_empty_tblproperties(self): + model = Mock() + model.config.extra = {"tblproperties": {}} + spec = TblPropertiesProcessor.from_model_node(model) + assert spec == TblPropertiesConfig(tblproperties={}) + + def test_from_model_node__with_incorrect_tblproperties(self): + model = Mock() + model.config.extra = {"tblproperties": True} + with pytest.raises( + DbtRuntimeError, + match="tblproperties must be a dictionary", + ): + _ = TblPropertiesProcessor.from_model_node(model) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 9beb93ef6..32658e2c1 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -1,6 +1,8 @@ import unittest -from dbt.adapters.databricks.utils import redact_credentials, remove_ansi +import pytest +from agate import Table, Row +from dbt.adapters.databricks.utils import redact_credentials, remove_ansi, get_first_row class TestDatabricksUtils(unittest.TestCase): @@ -87,3 +89,15 @@ def test_remove_ansi(self): 72 # how to execute python model in notebook """ self.assertEqual(remove_ansi(test_string), expected_string) + + +@pytest.mark.parametrize( + "input,expected", + [ + (Table([]), Row(set())), + (Table([Row(["first", "row"]), Row(["second", "row"])]), Row(["first", "row"])), + ], +) +def test_get_first_row(input, expected): + first_row = get_first_row(input) + assert first_row == expected diff --git a/tox.ini b/tox.ini index 5adee10af..20c9e4826 100644 --- a/tox.ini +++ b/tox.ini @@ -37,8 +37,8 @@ deps = [testenv:integration-databricks-cluster] basepython = python3 commands = - /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_cluster -n4 tests/integration/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' - /bin/bash -c '{envpython} -m pytest -v --profile databricks_cluster -n4 tests/functional/adapter/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' + /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_cluster -n auto tests/integration/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' + /bin/bash -c '{envpython} -m pytest -v --profile databricks_cluster -n auto tests/functional/adapter/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' passenv = DBT_* PYTEST_ADDOPTS @@ -50,8 +50,8 @@ allowlist_externals = /bin/bash [testenv:integration-databricks-uc-cluster] basepython = python3 commands = - /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_uc_cluster -n4 tests/integration/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' - /bin/bash -c '{envpython} -m pytest -v --profile databricks_uc_cluster -n4 tests/functional/adapter/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' + /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_uc_cluster -n auto tests/integration/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' + /bin/bash -c '{envpython} -m pytest -v --profile databricks_uc_cluster -n auto tests/functional/adapter/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' passenv = DBT_* PYTEST_ADDOPTS @@ -63,8 +63,8 @@ allowlist_externals = /bin/bash [testenv:integration-databricks-uc-sql-endpoint] basepython = python3 commands = - /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_uc_sql_endpoint -n4 tests/integration/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' - /bin/bash -c '{envpython} -m pytest -v --profile databricks_uc_sql_endpoint -n4 tests/functional/adapter/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' + /bin/bash -c '{envpython} -m pytest -v -m profile_databricks_uc_sql_endpoint -n auto tests/integration/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' + /bin/bash -c '{envpython} -m pytest -v --profile databricks_uc_sql_endpoint -n auto --dist loadscope tests/functional/adapter/* {posargs}; ret=$?; [ $ret = 5 ] && exit 0 || exit $ret' passenv = DBT_* PYTEST_ADDOPTS