Skip to content

Commit

Permalink
add ability to disable transaction control from parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
xxiong2902 committed Jan 10, 2024
1 parent e0f9c9f commit 629bd3b
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 18 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/cockroachdb/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.7.7"
version = "1.7.8"
62 changes: 46 additions & 16 deletions dbt/adapters/cockroachdb/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import AdapterResponse
from dbt.events import AdapterLogger
from dbt.events.contextvars import get_node_info
from dbt.events.functions import fire_event
from dbt.events.types import SQLCommit

from dbt.helper_types import Port
from dataclasses import dataclass
Expand Down Expand Up @@ -36,6 +39,7 @@ class CockroachdbCredentials(Credentials):
sslrootcert: Optional[str] = None
application_name: Optional[str] = "dbt"
retries: int = 1
enable_transaction: Optional[bool] = True

_ALIASES = {"dbname": "database", "pass": "password"}

Expand Down Expand Up @@ -64,6 +68,7 @@ def _connection_keys(self):
"sslrootcert",
"application_name",
"retries",
"enable_transaction"
)


Expand Down Expand Up @@ -147,7 +152,8 @@ def connect():
# started at the first command execution.
# Reference:
# - https://www.psycopg.org/docs/connection.html#connection.autocommit
handle.set_session(autocommit=False)
# handle.set_session(autocommit=False)
handle.set_session(autocommit=True)

if credentials.role:
handle.cursor().execute("set role {}".format(credentials.role))
Expand Down Expand Up @@ -186,14 +192,21 @@ def cancel(self, connection):
# probably bad, re-raise it
raise

sql = "select pg_terminate_backend({})".format(pid)
try:
# cockroachdb does not support pg_terminate_backend yet so we will
# try our best to terminate the backend, but likely will not work till
# support is implemented in cockroachdb
# https://github.com/cockroachdb/cockroach/issues/35897
sql = "select pg_terminate_backend({})".format(pid)

logger.debug("Cancelling query '{}' ({})".format(connection_name, pid))
logger.debug("Cancelling query '{}' ({})".format(connection_name, pid))

_, cursor = self.add_query(sql)
res = cursor.fetchone()
_, cursor = self.add_query(sql)
res = cursor.fetchone()

logger.debug("Cancel query '{}': {}".format(connection_name, res))
logger.debug("Cancel query '{}': {}".format(connection_name, res))
except:
pass

@classmethod
def get_credentials(cls, credentials):
Expand Down Expand Up @@ -223,17 +236,34 @@ def begin(self):
"it already had one open!".format(connection.name)
)

# We will attempt to close any created transaction by commiting first
# This is to work around the issue where CockroachDB is seeing an active transaction already.
# Reference:
# - https://github.com/cockroachdb/cockroach/issues/41513
# - https://github.com/cockroachdb/cockroach/issues/54954
try:
self.add_commit_query()
except Exception:
pass
if connection.credentials.enable_transaction:
# We will attempt to close any created transaction by commiting first
# This is to work around the issue where CockroachDB is seeing an active transaction already.
# Reference:
# - https://github.com/cockroachdb/cockroach/issues/41513
# - https://github.com/cockroachdb/cockroach/issues/54954
try:
self.add_commit_query()
except Exception:
pass

self.add_begin_query()
self.add_begin_query()

connection.transaction_open = True
return connection

def commit(self):
connection = self.get_thread_connection()
if connection.transaction_open is False:
raise dbt.exceptions.DbtInternalError(
'Tried to commit transaction on connection "{}", but '
"it does not have one open!".format(connection.name)
)

if connection.credentials.enable_transaction:
fire_event(SQLCommit(conn_name=connection.name, node_info=get_node_info()))
self.add_commit_query()

connection.transaction_open = False

return connection
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _dbt_psycopg2_name():
# For test build we should suffix it with -#, like: 1.7.5-1.
# This will generate a build version as 1.7.5.post1.
# When updating the package_version here, you must update the version in dbt.adapters.cockroachdb.__version__ as well
package_version = "1.7.7"
package_version = "1.7.8"
description = """The cockroachdb adapter plugin for dbt (data build tool)"""

this_directory = os.path.abspath(os.path.dirname(__file__))
Expand Down

0 comments on commit 629bd3b

Please sign in to comment.