Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding missing 1.9 Snapshot behavior #904

Merged
merged 3 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## dbt-databricks 1.9.2 (TBD)

### Features

- Update snapshot materialization to support new snapshot features ([904](https://github.com/databricks/dbt-databricks/pull/904))

### Under the Hood

- Refactor global state reading ([888](https://github.com/databricks/dbt-databricks/pull/888))
Expand Down
88 changes: 36 additions & 52 deletions dbt/include/databricks/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,27 +1,4 @@
{% macro databricks_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_identifier = target_relation.identifier ~ '__dbt_tmp' %}

{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=target_relation.schema,
database=target_relation.database,
type='view') -%}

{% set select = snapshot_staging_table(strategy, sql, target_relation) %}

{# needs to be a non-temp view so that its columns can be ascertained via `describe` #}
{% call statement('build_snapshot_staging_relation') %}
create or replace view {{ tmp_relation }}
as
{{ select }}
{% endcall %}

{% do return(tmp_relation) %}
{% endmacro %}


{% materialization snapshot, adapter='databricks' %}
{%- set config = model['config'] -%}

{%- set target_table = model.get('alias', model.get('name')) -%}

{%- set strategy_name = config.get('strategy') -%}
Expand Down Expand Up @@ -62,47 +39,43 @@
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", model['config'], target_relation_exists) %}

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %}
{% set build_or_select_sql = build_sql %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% do persist_docs(target_relation, model, for_relation=False) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set columns = config.get("snapshot_table_column_names") or get_snapshot_table_column_names() %}

{% if target_relation.database is none %}
{% set staging_table = spark_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% else %}
{% set staging_table = databricks_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% endif %}
{{ adapter.assert_valid_snapshot_target_given_strategy(target_relation, columns, strategy) }}

{% set build_or_select_sql = snapshot_staging_table(strategy, sql, target_relation) %}
{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set remove_columns = ['dbt_change_type', 'DBT_CHANGE_TYPE', 'dbt_unique_key', 'DBT_UNIQUE_KEY'] %}
{% if unique_key | is_list %}
{% for key in strategy.unique_key %}
{{ remove_columns.append('dbt_unique_key_' + loop.index|string) }}
{{ remove_columns.append('DBT_UNIQUE_KEY_' + loop.index|string) }}
{% endfor %}
{% endif %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| rejectattr('name', 'in', remove_columns)
| list %}

{% set quoted_source_columns = [] %}
Expand All @@ -117,23 +90,34 @@
)
%}

{% call statement_with_staging_table('main', staging_table) %}
{{ final_sql }}
{% endcall %}
{% endif %}

{% do persist_docs(target_relation, model, for_relation=True) %}

{% endif %}
{{ check_time_data_types(build_or_select_sql) }}

{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}
{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% do persist_constraints(target_relation, model) %}
{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if not target_relation_exists %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{% do persist_constraints(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import pytest

from dbt.tests.adapter.simple_snapshot.new_record_mode import (
_delete_sql,
_invalidate_sql,
_ref_snapshot_sql,
_seed_new_record_mode,
_snapshot_actual_sql,
_snapshots_yml,
_update_sql,
)
from dbt.tests.util import check_relations_equal, run_dbt


class TestDatabricksSnapshotNewRecordMode:
@pytest.fixture(scope="class")
def snapshots(self):
return {"snapshot.sql": _snapshot_actual_sql}

@pytest.fixture(scope="class")
def models(self):
return {
"snapshots.yml": _snapshots_yml,
"ref_snapshot.sql": _ref_snapshot_sql,
}

@pytest.fixture(scope="class")
def seed_new_record_mode(self):
return _seed_new_record_mode

@pytest.fixture(scope="class")
def invalidate_sql_1(self):
return _invalidate_sql.split(";", 1)[0].replace("BEGIN", "")

@pytest.fixture(scope="class")
def invalidate_sql_2(self):
return _invalidate_sql.split(";", 1)[1].replace("END", "").replace(";", "")

@pytest.fixture(scope="class")
def update_sql(self):
return _update_sql.replace("text", "string")

@pytest.fixture(scope="class")
def delete_sql(self):
return _delete_sql

def test_snapshot_new_record_mode(
self, project, seed_new_record_mode, invalidate_sql_1, invalidate_sql_2, update_sql
):
for sql in (
seed_new_record_mode.replace("text", "string")
.replace("TEXT", "STRING")
.replace("BEGIN", "")
.replace("END;", "")
.replace(" WITHOUT TIME ZONE", "")
.split(";")
):
project.run_sql(sql)
results = run_dbt(["snapshot"])
assert len(results) == 1

project.run_sql(invalidate_sql_1)
project.run_sql(invalidate_sql_2)
project.run_sql(update_sql)

results = run_dbt(["snapshot"])
assert len(results) == 1

check_relations_equal(project.adapter, ["snapshot_actual", "snapshot_expected"])

project.run_sql(_delete_sql)

results = run_dbt(["snapshot"])
assert len(results) == 1
Loading
Loading