From a13dd53544ed91d8555ba09b740e4d9459c35e12 Mon Sep 17 00:00:00 2001 From: Ben Cassell Date: Thu, 5 Dec 2024 13:37:29 -0800 Subject: [PATCH] trying to merge main --- dbt/adapters/databricks/api_client.py | 15 +++++++++------ dbt/adapters/databricks/connections.py | 4 ---- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/dbt/adapters/databricks/api_client.py b/dbt/adapters/databricks/api_client.py index bacfc608..6b937cf6 100644 --- a/dbt/adapters/databricks/api_client.py +++ b/dbt/adapters/databricks/api_client.py @@ -267,7 +267,7 @@ def _convert_to_sdk_types(self, job_settings: dict[str, Any]) -> dict[str, Any]: def _get_exception(self, run: Run, run_id: int) -> None: try: - run_id = utils.if_some(run.tasks, lambda x: x[0].run_id) or run_id + run_id = utils.if_some(run.tasks, lambda x: x[0].run_id if x else None) or run_id # type: ignore output = self.wc.jobs.get_run_output(run_id) raise DbtRuntimeError( "Python model failed with traceback as:\n" @@ -281,8 +281,8 @@ def _get_exception(self, run: Run, run_id: int) -> None: if isinstance(e, DbtRuntimeError): raise e else: - result_state = utils.if_some(run.state, lambda s: s.result_state) or "" - state_message = utils.if_some(run.state, lambda s: s.state_message) or "" + result_state = utils.if_some(run.state, lambda s: s.result_state) or "" # type: ignore + state_message = utils.if_some(run.state, lambda s: s.state_message) or "" # type: ignore raise DbtRuntimeError( f"Python model run ended in state {result_state} " f"with state_message\n{state_message}" @@ -391,7 +391,10 @@ def poll_for_completion(self, pipeline_id: str) -> None: if response.cause: raise DbtRuntimeError(f"Pipeline {pipeline_id} failed: {response.cause}") else: - latest_update = utils.if_some(response.latest_updates, lambda x: x[0]) + latest_update = utils.if_some( + response.latest_updates, + lambda x: x[0] if x else None, # type: ignore + ) last_error = self.get_update_error(pipeline_id, latest_update) raise DbtRuntimeError(f"Pipeline {pipeline_id} failed: {last_error}") @@ -401,7 +404,7 @@ def get_update_error(self, pipeline_id: str, update_id: str) -> str: e for e in events if e.event_type == "update_progress" - and utils.if_some(e.origin, lambda x: x.update_id == update_id) + and utils.if_some(e.origin, lambda x: x.update_id == update_id) # type: ignore ] error_events = [e.error for e in update_events if e.error] @@ -411,7 +414,7 @@ def get_update_error(self, pipeline_id: str, update_id: str) -> str: msg = ( utils.if_some( error_events[0].exceptions, - lambda x: "\n".join(map(lambda y: y.message or "", x)), + lambda x: "\n".join(map(lambda y: y.message or "", x)), # type: ignore ) or "" ) diff --git a/dbt/adapters/databricks/connections.py b/dbt/adapters/databricks/connections.py index 7ee9af34..ba33f893 100644 --- a/dbt/adapters/databricks/connections.py +++ b/dbt/adapters/databricks/connections.py @@ -35,7 +35,6 @@ from dbt.adapters.databricks.__version__ import version as __version__ from dbt.adapters.databricks.api_client import DatabricksApiClient from dbt.adapters.databricks.credentials import ( - BearerAuth, DatabricksCredentialManager, DatabricksCredentials, ) @@ -392,9 +391,6 @@ def __init__(self, profile: AdapterRequiredConfig, mp_context: SpawnContext): def cancel_open(self) -> list[str]: cancelled = super().cancel_open() - creds = cast(DatabricksCredentials, self.profile.credentials) - assert self.credentials_manager - api_client = DatabricksApiClient.create(self.credentials_manager.api_client, creds, 15 * 60) logger.info("Cancelling open python jobs") PythonRunTracker.cancel_runs(self.api_client) return cancelled