Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to warehouse-per-model logging and tests #511

Merged
merged 2 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import threading
import time
import requests
from threading import get_ident
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -1120,21 +1121,34 @@ def _get_compute_name(node: Optional[ResultNode]) -> Optional[str]:


def _get_http_path(node: Optional[ResultNode], creds: DatabricksCredentials) -> Optional[str]:
# Get the http path of the compute resource specified in the node's config.
# If none is specified return the default path from creds.
thread_id = (os.getpid(), get_ident())

# If there is no node we return the http_path for the default compute.
if not node:
logger.debug(f"Thread {thread_id}: using default compute resource.")
return creds.http_path

# Get the name of the compute resource specified in the node's config.
# If none is specified return the http_path for the default compute.
compute_name = _get_compute_name(node)
if not node or not compute_name:
logger.debug("Using default compute resource.")
if not compute_name:
logger.debug(f"On thread {thread_id}: {node.relation_name} using default compute resource.")
return creds.http_path

# Get the http_path for the named compute.
http_path = None
if creds.compute:
logger.debug(f"Using compute resource {compute_name}.")
http_path = creds.compute.get(compute_name, {}).get("http_path", None)

# no http_path for the named compute resource is an error condition
if not http_path:
raise dbt.exceptions.DbtRuntimeError(
f"Compute resource {compute_name} does not exist, relation: {node.relation_name}"
f"Compute resource {compute_name} does not exist or "
f"does not specify http_path, relation: {node.relation_name}"
)

logger.debug(
f"On thread {thread_id}: {node.relation_name} using compute resource '{compute_name}'."
)

return http_path
26 changes: 26 additions & 0 deletions tests/functional/adapter/warehouse_per_model/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,33 @@
- name: date
"""

seed_properties = """
version: 2

seeds:
- name: source
config:
databricks_compute: alternate_warehouse2
"""

expected_target = """id,name,date
1,Alice,2022-01-01
2,Bob,2022-01-02
"""

target_snap = """
{% snapshot target_snap %}

{{
config(
target_schema='snapshots',
unique_key='id',
strategy='check',
check_cols=['id', 'name', 'date'],
databricks_compute='alternate_warehouse3'
)
}}
select * from {{ ref('target') }}

{% endsnapshot %}
"""
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,34 @@ class TestWarehousePerModel(BaseWarehousePerModel):
def profiles_config_update(self, dbt_profile_target):
outputs = {"default": dbt_profile_target}
outputs["default"]["compute"] = {
"alternate_warehouse": {"http_path": dbt_profile_target["http_path"]}
"alternate_warehouse": {"http_path": dbt_profile_target["http_path"]},
"alternate_warehouse2": {"http_path": dbt_profile_target["http_path"]},
"alternate_warehouse3": {"http_path": dbt_profile_target["http_path"]},
}
return {"test": {"outputs": outputs, "target": "default"}}

@pytest.fixture(scope="class")
def seeds(self):
return {
"source.csv": fixtures.source,
"properties.yml": fixtures.seed_properties,
}

@pytest.fixture(scope="class")
def snapshots(self):
return {
"target_snap.sql": fixtures.target_snap,
}

def test_wpm(self, project):
util.run_dbt(["seed"])
util.run_dbt(["run", "--select", "target"])
_, log = util.run_dbt_and_capture(["--debug", "seed"])
assert "`source` using compute resource 'alternate_warehouse2'" in log

_, log = util.run_dbt_and_capture(["--debug", "run", "--select", "target", "target3"])
assert "`target` using compute resource 'alternate_warehouse'" in log
assert "`target3` using default compute resource" in log

_, log = util.run_dbt_and_capture(["--debug", "snapshot"])
assert "`target_snap` using compute resource 'alternate_warehouse3'" in log

util.check_relations_equal(project.adapter, ["target", "source"])
22 changes: 13 additions & 9 deletions tests/unit/test_compute_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
class TestDatabricksConnectionHTTPPath(unittest.TestCase):
"""Test the various cases for determining a specified warehouse."""

errMsg = (
"Compute resource foo does not exist or does not specify http_path, " "relation: a_relation"
)

def test_get_http_path_model(self):
default_path = "my_http_path"
creds = connections.DatabricksCredentials(http_path=default_path)
Expand Down Expand Up @@ -42,21 +46,21 @@ def test_get_http_path_model(self):
node.config._extra["databricks_compute"] = "foo"
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {"foo": {}}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

Expand Down Expand Up @@ -99,21 +103,21 @@ def test_get_http_path_seed(self):
node.config._extra["databricks_compute"] = "foo"
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {"foo": {}}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

Expand Down Expand Up @@ -155,21 +159,21 @@ def test_get_http_path_snapshot(self):
node.config._extra["databricks_compute"] = "foo"
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {"foo": {}}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

Expand Down