Skip to content

Commit

Permalink
detect snowflake metadata queries
Browse files Browse the repository at this point in the history
  • Loading branch information
buremba committed Aug 6, 2024
1 parent 2b1f5e8 commit 55cae89
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 30 deletions.
4 changes: 3 additions & 1 deletion universql/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ async def login_request(request: Request) -> JSONResponse:
credentials["database"] = params.get('databaseName')
if "warehouse" not in credentials:
credentials["warehouse"] = params.get('warehouse')
if "warehouse" not in credentials:
if "role" not in credentials:
credentials["role"] = params.get('roleName')
if "schema" not in credentials:
credentials["schema"] = params.get('schemaName')

token = str(uuid4())
message = None
Expand Down
66 changes: 37 additions & 29 deletions universql/warehouse/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
logger = logging.getLogger("🐥")



class UniverSQLSession:
def __init__(self, context, token, credentials: dict, session_parameters: dict):
self.context = context
Expand Down Expand Up @@ -104,33 +103,42 @@ def _do_query(self, raw_query: str) -> (str, List, pyarrow.Table):
last_compute = Compute.SNOWFLAKE
else:
tables = list(ast.find_all(sqlglot.exp.Table))

locations = None
try:
locations = self.catalog.get_table_references(self.duckdb, tables)
except DatabaseError as e:
local_error_message = (f"Unable to find location of Iceberg tables. "
f"See: https://github.com/buremba/universql#cant-query-native-snowflake-tables. Cause: {e.msg}")

transformed_ast = self.sync_duckdb_catalog(locations,
simplify(ast)) if locations is not None else None
if transformed_ast is None:
last_compute = None
break

sql = transformed_ast.sql(dialect="duckdb", pretty=True)
try:
logger.info(f"[{self.token}] executing DuckDB query:\n{prepend_to_lines(sql)}")
self.duckdb_emulator.execute(sql)
last_compute = Compute.LOCAL
except duckdb.Error as e:
local_error_message = f"Unable to run the parse locally on DuckDB. {e.args}"
last_compute = None
break
except DatabaseError as e:
local_error_message = f"Unable to run the query locally on DuckDB. {e.msg}"
last_compute = None
break
for table in tables:
if (len(table.parts) == 1 and self.credentials.get('schema') == "information_schema"
or len(table.parts) > 1 and isinstance(table.parts[-2].this, str) and
table.parts[-2].this.lower() == 'information_schema'
or len(table.parts) > 2 and isinstance(table.parts[-3].this, str)
and table.parts[-3].this.lower() == "snowflake"):
last_compute = None
break

if last_compute is not None:
locations = None
try:
locations = self.catalog.get_table_references(self.duckdb, tables)
except DatabaseError as e:
local_error_message = (f"Unable to find location of Iceberg tables. "
f"See: https://github.com/buremba/universql#cant-query-native-snowflake-tables. Cause: {e.msg}")

transformed_ast = self.sync_duckdb_catalog(locations,
simplify(ast)) if locations is not None else None
if transformed_ast is None:
last_compute = None
break

sql = transformed_ast.sql(dialect="duckdb", pretty=True)
try:
logger.info(f"[{self.token}] executing DuckDB query:\n{prepend_to_lines(sql)}")
self.duckdb_emulator.execute(sql)
last_compute = Compute.LOCAL
except duckdb.Error as e:
local_error_message = f"Unable to run the parse locally on DuckDB. {e.args}"
last_compute = None
break
except DatabaseError as e:
local_error_message = f"Unable to run the query locally on DuckDB. {e.msg}"
last_compute = None
break

if last_compute == Compute.SNOWFLAKE:
return self.get_snowflake_result()
Expand All @@ -150,7 +158,7 @@ def do_snowflake_query(self, queries, raw_query, start_time, local_error_message
self.snowflake.execute(queries, raw_query)
logger.info(f"[{self.token}] Query is done. ({get_friendly_time_since(start_time)})")
except SnowflakeError as e:
final_error = f" {'[DuckDB]: '+local_error_message+'. ' if local_error_message else ''}\n [Snowflake]: {e.message}"
final_error = f" {'[DuckDB]: ' + local_error_message + '. ' if local_error_message else ''}\n [Snowflake]: {e.message}"
if self.compute == Compute.LOCAL.value:
final_error = "The query without warehouse failed to run remotely, and the compute is set to LOCAL.\n" + final_error

Expand Down

0 comments on commit 55cae89

Please sign in to comment.