Skip to content

Commit

Permalink
expose a little bit
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db committed Jan 23, 2025
1 parent d60c661 commit c882882
Show file tree
Hide file tree
Showing 13 changed files with 251 additions and 224 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dbt.adapters.base import AdapterPlugin
from dbt.adapters.databricks.credentials import DatabricksCredentials
from dbt.adapters.databricks.connection.credentials import DatabricksCredentials
from dbt.adapters.databricks.impl import DatabricksAdapter
from dbt.include import databricks

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/databricks/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dbt.adapters.databricks import utils
from dbt.adapters.databricks.__version__ import version
from dbt.adapters.databricks.auth import BearerAuth
from dbt.adapters.databricks.credentials import DatabricksCredentials
from dbt.adapters.databricks.connection.credentials import DatabricksCredentials
from dbt.adapters.databricks.logging import logger

DEFAULT_POLLING_INTERVAL = 10
Expand Down
53 changes: 28 additions & 25 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from contextlib import contextmanager
from dataclasses import dataclass
from multiprocessing.context import SpawnContext
from numbers import Number
from typing import TYPE_CHECKING, Any, Optional, cast

from dbt_common.events.contextvars import get_node_info
Expand All @@ -26,8 +25,11 @@
)
from dbt.adapters.databricks.__version__ import version as __version__
from dbt.adapters.databricks.api_client import DatabricksApiClient
from dbt.adapters.databricks.credentials import DatabricksCredentials, TCredentialProvider
from dbt.adapters.databricks.dbsql import DatabricksHandle
from dbt.adapters.databricks.connection.credentials import (
DatabricksCredentials,
TCredentialProvider,
)
from dbt.adapters.databricks.connection.handle import CursorWrapper, DatabricksHandle
from dbt.adapters.databricks.events.connection_events import (
ConnectionAcquire,
ConnectionCreate,
Expand Down Expand Up @@ -284,24 +286,23 @@ def add_query(
pre = time.time()

handle: DatabricksHandle = connection.handle
handle.execute(sql, bindings)
cursor = handle.execute(sql, bindings)

fire_event(
SQLQueryStatus(
status=str(handle.get_response()),
status=str(cursor.get_response()),
elapsed=round((time.time() - pre), 2),
node_info=get_node_info(),
)
)

return connection, handle
return connection, cursor
except Error:
if handle is not None:
handle.close_cursor()
close_cursor = True
raise
finally:
if close_cursor and handle is not None:
handle.close_cursor()
if close_cursor and cursor is not None:
cursor.close()

def execute(
self,
Expand All @@ -311,21 +312,23 @@ def execute(
limit: Optional[int] = None,
) -> tuple[AdapterResponse, "Table"]:
sql = self._add_query_comment(sql)
_, handle = self.add_query(sql, auto_begin)
_, cursor = self.add_query(sql, auto_begin)
try:
response = handle.get_response()
response = cursor.get_response()
if fetch:
table = self.get_result_from_cursor(handle, limit)
table = self.get_result_from_cursor(cursor, limit)
else:
# Lazy import agate to improve CLI startup time
from dbt_common.clients import agate_helper

table = agate_helper.empty_table()
return response, table
finally:
handle.close_cursor()
cursor.close()

def _execute_with_handle(self, log_sql: str, f: Callable[[DatabricksHandle], None]) -> "Table":
def _execute_with_cursor(
self, log_sql: str, f: Callable[[DatabricksHandle], CursorWrapper]
) -> "Table":
connection = self.get_thread_connection()

fire_event(ConnectionUsed(conn_type=self.TYPE, conn_name=cast_to_str(connection.name)))
Expand All @@ -343,34 +346,34 @@ def _execute_with_handle(self, log_sql: str, f: Callable[[DatabricksHandle], Non
pre = time.time()

handle: DatabricksHandle = connection.handle
f(handle)
cursor = f(handle)

fire_event(
SQLQueryStatus(
status=str(self.get_response(handle)),
status=str(self.get_response(cursor)),
elapsed=round((time.time() - pre), 2),
node_info=get_node_info(),
)
)

return self.get_result_from_cursor(handle, None)
finally:
if handle is not None:
handle.close_cursor()
if cursor is not None:
cursor.close()

def list_schemas(self, database: str, schema: Optional[str] = None) -> "Table":
database = database.strip("`")
if schema:
schema = schema.strip("`").lower()
return self._execute_with_handle(
return self._execute_with_cursor(
f"GetSchemas(database={database}, schema={schema})",
lambda cursor: cursor.list_schemas(database=database, schema=schema),
)

def list_tables(self, database: str, schema: str) -> "Table":
database = database.strip("`")
schema = schema.strip("`").lower()
return self._execute_with_handle(
return self._execute_with_cursor(
f"GetTables(database={database}, schema={schema})",
lambda cursor: cursor.list_tables(database=database, schema=schema),
)
Expand Down Expand Up @@ -570,9 +573,9 @@ def close(cls, connection: Connection) -> Connection:
return connection

@classmethod
def get_response(cls, handle: Any) -> AdapterResponse:
if isinstance(handle, DatabricksHandle):
return handle.get_response()
def get_response(cls, cursor: Any) -> AdapterResponse:
if isinstance(cursor, CursorWrapper):
return cursor.get_response()
else:
return AdapterResponse("OK")

Expand Down Expand Up @@ -768,7 +771,7 @@ def _get_max_idle_time(query_header_context: Any, creds: DatabricksCredentials)
"connect_max_idle", max_idle_time
)

if not isinstance(max_idle_time, Number):
if not isinstance(max_idle_time, int):
if isinstance(max_idle_time, str) and max_idle_time.strip().isnumeric():
return int(max_idle_time.strip())
else:
Expand Down
4 changes: 2 additions & 2 deletions dbt/adapters/databricks/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
from typing import Any, Optional, Union, cast

import keyring
from dbt_common.exceptions import DbtConfigError, DbtValidationError

from databricks.sdk.core import CredentialsProvider
from databricks.sdk.oauth import OAuthClient, SessionCredentials
from dbt_common.exceptions import DbtConfigError, DbtValidationError

from dbt.adapters.contracts.connection import Credentials
from dbt.adapters.databricks.auth import m2m_auth, token_auth
from dbt.adapters.databricks.events.credential_events import (
Expand Down
182 changes: 0 additions & 182 deletions dbt/adapters/databricks/dbsql.py

This file was deleted.

Loading

0 comments on commit c882882

Please sign in to comment.