Skip to content

Commit

Permalink
Updates to warehouse-per-model logging and tests (#511)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcypher-databricks authored Nov 20, 2023
2 parents c0a9416 + 1e3b37d commit ceae0e4
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 18 deletions.
26 changes: 20 additions & 6 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import threading
import time
import requests
from threading import get_ident
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -1120,21 +1121,34 @@ def _get_compute_name(node: Optional[ResultNode]) -> Optional[str]:


def _get_http_path(node: Optional[ResultNode], creds: DatabricksCredentials) -> Optional[str]:
# Get the http path of the compute resource specified in the node's config.
# If none is specified return the default path from creds.
thread_id = (os.getpid(), get_ident())

# If there is no node we return the http_path for the default compute.
if not node:
logger.debug(f"Thread {thread_id}: using default compute resource.")
return creds.http_path

# Get the name of the compute resource specified in the node's config.
# If none is specified return the http_path for the default compute.
compute_name = _get_compute_name(node)
if not node or not compute_name:
logger.debug("Using default compute resource.")
if not compute_name:
logger.debug(f"On thread {thread_id}: {node.relation_name} using default compute resource.")
return creds.http_path

# Get the http_path for the named compute.
http_path = None
if creds.compute:
logger.debug(f"Using compute resource {compute_name}.")
http_path = creds.compute.get(compute_name, {}).get("http_path", None)

# no http_path for the named compute resource is an error condition
if not http_path:
raise dbt.exceptions.DbtRuntimeError(
f"Compute resource {compute_name} does not exist, relation: {node.relation_name}"
f"Compute resource {compute_name} does not exist or "
f"does not specify http_path, relation: {node.relation_name}"
)

logger.debug(
f"On thread {thread_id}: {node.relation_name} using compute resource '{compute_name}'."
)

return http_path
26 changes: 26 additions & 0 deletions tests/functional/adapter/warehouse_per_model/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,33 @@
- name: date
"""

seed_properties = """
version: 2
seeds:
- name: source
config:
databricks_compute: alternate_warehouse2
"""

expected_target = """id,name,date
1,Alice,2022-01-01
2,Bob,2022-01-02
"""

target_snap = """
{% snapshot target_snap %}
{{
config(
target_schema='snapshots',
unique_key='id',
strategy='check',
check_cols=['id', 'name', 'date'],
databricks_compute='alternate_warehouse3'
)
}}
select * from {{ ref('target') }}
{% endsnapshot %}
"""
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,34 @@ class TestWarehousePerModel(BaseWarehousePerModel):
def profiles_config_update(self, dbt_profile_target):
outputs = {"default": dbt_profile_target}
outputs["default"]["compute"] = {
"alternate_warehouse": {"http_path": dbt_profile_target["http_path"]}
"alternate_warehouse": {"http_path": dbt_profile_target["http_path"]},
"alternate_warehouse2": {"http_path": dbt_profile_target["http_path"]},
"alternate_warehouse3": {"http_path": dbt_profile_target["http_path"]},
}
return {"test": {"outputs": outputs, "target": "default"}}

@pytest.fixture(scope="class")
def seeds(self):
return {
"source.csv": fixtures.source,
"properties.yml": fixtures.seed_properties,
}

@pytest.fixture(scope="class")
def snapshots(self):
return {
"target_snap.sql": fixtures.target_snap,
}

def test_wpm(self, project):
util.run_dbt(["seed"])
util.run_dbt(["run", "--select", "target"])
_, log = util.run_dbt_and_capture(["--debug", "seed"])
assert "`source` using compute resource 'alternate_warehouse2'" in log

_, log = util.run_dbt_and_capture(["--debug", "run", "--select", "target", "target3"])
assert "`target` using compute resource 'alternate_warehouse'" in log
assert "`target3` using default compute resource" in log

_, log = util.run_dbt_and_capture(["--debug", "snapshot"])
assert "`target_snap` using compute resource 'alternate_warehouse3'" in log

util.check_relations_equal(project.adapter, ["target", "source"])
22 changes: 13 additions & 9 deletions tests/unit/test_compute_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
class TestDatabricksConnectionHTTPPath(unittest.TestCase):
"""Test the various cases for determining a specified warehouse."""

errMsg = (
"Compute resource foo does not exist or does not specify http_path, " "relation: a_relation"
)

def test_get_http_path_model(self):
default_path = "my_http_path"
creds = connections.DatabricksCredentials(http_path=default_path)
Expand Down Expand Up @@ -42,21 +46,21 @@ def test_get_http_path_model(self):
node.config._extra["databricks_compute"] = "foo"
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {"foo": {}}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

Expand Down Expand Up @@ -99,21 +103,21 @@ def test_get_http_path_seed(self):
node.config._extra["databricks_compute"] = "foo"
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {"foo": {}}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

Expand Down Expand Up @@ -155,21 +159,21 @@ def test_get_http_path_snapshot(self):
node.config._extra["databricks_compute"] = "foo"
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

creds.compute = {"foo": {}}
with self.assertRaisesRegex(
dbt.exceptions.DbtRuntimeError,
"Compute resource foo does not exist, relation: a_relation",
self.errMsg,
):
connections._get_http_path(node, creds)

Expand Down

0 comments on commit ceae0e4

Please sign in to comment.