diff --git a/dbt/adapters/databricks/connections.py b/dbt/adapters/databricks/connections.py index 233bdab4d..5ce424434 100644 --- a/dbt/adapters/databricks/connections.py +++ b/dbt/adapters/databricks/connections.py @@ -11,6 +11,7 @@ import threading import time import requests +from threading import get_ident from typing import ( Any, Callable, @@ -1120,21 +1121,34 @@ def _get_compute_name(node: Optional[ResultNode]) -> Optional[str]: def _get_http_path(node: Optional[ResultNode], creds: DatabricksCredentials) -> Optional[str]: - # Get the http path of the compute resource specified in the node's config. - # If none is specified return the default path from creds. + thread_id = (os.getpid(), get_ident()) + + # If there is no node we return the http_path for the default compute. + if not node: + logger.debug(f"Thread {thread_id}: using default compute resource.") + return creds.http_path + + # Get the name of the compute resource specified in the node's config. + # If none is specified return the http_path for the default compute. compute_name = _get_compute_name(node) - if not node or not compute_name: - logger.debug("Using default compute resource.") + if not compute_name: + logger.debug(f"On thread {thread_id}: {node.relation_name} using default compute resource.") return creds.http_path + # Get the http_path for the named compute. http_path = None if creds.compute: - logger.debug(f"Using compute resource {compute_name}.") http_path = creds.compute.get(compute_name, {}).get("http_path", None) + # no http_path for the named compute resource is an error condition if not http_path: raise dbt.exceptions.DbtRuntimeError( - f"Compute resource {compute_name} does not exist, relation: {node.relation_name}" + f"Compute resource {compute_name} does not exist or " + f"does not specify http_path, relation: {node.relation_name}" ) + logger.debug( + f"On thread {thread_id}: {node.relation_name} using compute resource '{compute_name}'." + ) + return http_path diff --git a/tests/functional/adapter/warehouse_per_model/fixtures.py b/tests/functional/adapter/warehouse_per_model/fixtures.py index 7a5f9fa1b..60a38e4c4 100644 --- a/tests/functional/adapter/warehouse_per_model/fixtures.py +++ b/tests/functional/adapter/warehouse_per_model/fixtures.py @@ -44,7 +44,33 @@ - name: date """ +seed_properties = """ +version: 2 + +seeds: + - name: source + config: + databricks_compute: alternate_warehouse2 +""" + expected_target = """id,name,date 1,Alice,2022-01-01 2,Bob,2022-01-02 """ + +target_snap = """ +{% snapshot target_snap %} + +{{ + config( + target_schema='snapshots', + unique_key='id', + strategy='check', + check_cols=['id', 'name', 'date'], + databricks_compute='alternate_warehouse3' + ) +}} +select * from {{ ref('target') }} + +{% endsnapshot %} +""" diff --git a/tests/functional/adapter/warehouse_per_model/test_warehouse_per_model.py b/tests/functional/adapter/warehouse_per_model/test_warehouse_per_model.py index 608d58b8d..a8ee8d333 100644 --- a/tests/functional/adapter/warehouse_per_model/test_warehouse_per_model.py +++ b/tests/functional/adapter/warehouse_per_model/test_warehouse_per_model.py @@ -90,11 +90,34 @@ class TestWarehousePerModel(BaseWarehousePerModel): def profiles_config_update(self, dbt_profile_target): outputs = {"default": dbt_profile_target} outputs["default"]["compute"] = { - "alternate_warehouse": {"http_path": dbt_profile_target["http_path"]} + "alternate_warehouse": {"http_path": dbt_profile_target["http_path"]}, + "alternate_warehouse2": {"http_path": dbt_profile_target["http_path"]}, + "alternate_warehouse3": {"http_path": dbt_profile_target["http_path"]}, } return {"test": {"outputs": outputs, "target": "default"}} + @pytest.fixture(scope="class") + def seeds(self): + return { + "source.csv": fixtures.source, + "properties.yml": fixtures.seed_properties, + } + + @pytest.fixture(scope="class") + def snapshots(self): + return { + "target_snap.sql": fixtures.target_snap, + } + def test_wpm(self, project): - util.run_dbt(["seed"]) - util.run_dbt(["run", "--select", "target"]) + _, log = util.run_dbt_and_capture(["--debug", "seed"]) + assert "`source` using compute resource 'alternate_warehouse2'" in log + + _, log = util.run_dbt_and_capture(["--debug", "run", "--select", "target", "target3"]) + assert "`target` using compute resource 'alternate_warehouse'" in log + assert "`target3` using default compute resource" in log + + _, log = util.run_dbt_and_capture(["--debug", "snapshot"]) + assert "`target_snap` using compute resource 'alternate_warehouse3'" in log + util.check_relations_equal(project.adapter, ["target", "source"]) diff --git a/tests/unit/test_compute_config.py b/tests/unit/test_compute_config.py index 6af95b751..5c9d35e23 100644 --- a/tests/unit/test_compute_config.py +++ b/tests/unit/test_compute_config.py @@ -7,6 +7,10 @@ class TestDatabricksConnectionHTTPPath(unittest.TestCase): """Test the various cases for determining a specified warehouse.""" + errMsg = ( + "Compute resource foo does not exist or does not specify http_path, " "relation: a_relation" + ) + def test_get_http_path_model(self): default_path = "my_http_path" creds = connections.DatabricksCredentials(http_path=default_path) @@ -42,21 +46,21 @@ def test_get_http_path_model(self): node.config._extra["databricks_compute"] = "foo" with self.assertRaisesRegex( dbt.exceptions.DbtRuntimeError, - "Compute resource foo does not exist, relation: a_relation", + self.errMsg, ): connections._get_http_path(node, creds) creds.compute = {} with self.assertRaisesRegex( dbt.exceptions.DbtRuntimeError, - "Compute resource foo does not exist, relation: a_relation", + self.errMsg, ): connections._get_http_path(node, creds) creds.compute = {"foo": {}} with self.assertRaisesRegex( dbt.exceptions.DbtRuntimeError, - "Compute resource foo does not exist, relation: a_relation", + self.errMsg, ): connections._get_http_path(node, creds) @@ -99,21 +103,21 @@ def test_get_http_path_seed(self): node.config._extra["databricks_compute"] = "foo" with self.assertRaisesRegex( dbt.exceptions.DbtRuntimeError, - "Compute resource foo does not exist, relation: a_relation", + self.errMsg, ): connections._get_http_path(node, creds) creds.compute = {} with self.assertRaisesRegex( dbt.exceptions.DbtRuntimeError, - "Compute resource foo does not exist, relation: a_relation", + self.errMsg, ): connections._get_http_path(node, creds) creds.compute = {"foo": {}} with self.assertRaisesRegex( dbt.exceptions.DbtRuntimeError, - "Compute resource foo does not exist, relation: a_relation", + self.errMsg, ): connections._get_http_path(node, creds) @@ -155,21 +159,21 @@ def test_get_http_path_snapshot(self): node.config._extra["databricks_compute"] = "foo" with self.assertRaisesRegex( dbt.exceptions.DbtRuntimeError, - "Compute resource foo does not exist, relation: a_relation", + self.errMsg, ): connections._get_http_path(node, creds) creds.compute = {} with self.assertRaisesRegex( dbt.exceptions.DbtRuntimeError, - "Compute resource foo does not exist, relation: a_relation", + self.errMsg, ): connections._get_http_path(node, creds) creds.compute = {"foo": {}} with self.assertRaisesRegex( dbt.exceptions.DbtRuntimeError, - "Compute resource foo does not exist, relation: a_relation", + self.errMsg, ): connections._get_http_path(node, creds)