Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy bug fix #27

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion universql/plugins/snow.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ def convert_properties(self, file_format, snowflake_property_name, snowflake_pro
def _format_value_for_duckdb(self, file_format, snowflake_property_name, data):
snowflake_type = data["snowflake_property_type"]
duckdb_type = data["duckdb_property_type"]
snowflake_value = data["snowflake_property_value"]
snowflake_value = str(data["snowflake_property_value"])
if snowflake_property_name in ["date_format", "timestamp_format"]:
duckdb_value = snowflake_value
for snowflake_datetime_component, duckdb_datetime_component in SNOWFLAKE_TO_DUCKDB_DATETIME_MAPPINGS.items():
Expand Down
39 changes: 22 additions & 17 deletions universql/warehouse/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ def _get_property(self, ast: sqlglot.exp.Create, name: str):

def execute(self, ast: sqlglot.exp.Expression, catalog_executor: Executor, locations: Tables) -> typing.Optional[
Locations]:
cache_directory = self.catalog.context.get('cache_directory')

if not catalog_executor.is_warm():
# since duckdb doesn't implement auth layer,
# force the catalog to perform auth before executing any query
Expand Down Expand Up @@ -354,22 +356,26 @@ def execute(self, ast: sqlglot.exp.Expression, catalog_executor: Executor, locat
if not is_temp:
return {destination_table: ast.expression}
elif isinstance(ast, Insert):
print("it is an insert")
table_type = self.catalog._get_table_location(destination_table)
if table_type == TableType.ICEBERG:
namespace = self.catalog.iceberg_catalog.properties.get('namespace')
try:
iceberg_table = self.catalog.iceberg_catalog.load_table((namespace, full_table))
except NoSuchTableError as e:
raise QueryError(f"Error accessing catalog {e.args}")
self.execute_raw(self._sync_and_transform_query(ast.expression, locations).sql(dialect="duckdb"),
catalog_executor)
table = self.get_as_table()
iceberg_table.append(table)
elif table_type.LOCAL:
self.execute_raw(self._sync_and_transform_query(ast, locations).sql(dialect="duckdb"),
catalog_executor)
else:
raise QueryError("Unable to determine table type")
try:
if table_type == TableType.ICEBERG:
namespace = self.catalog.iceberg_catalog.properties.get('namespace')
try:
iceberg_table = self.catalog.iceberg_catalog.load_table((namespace, full_table))
except NoSuchTableError as e:
raise QueryError(f"Error accessing catalog {e.args}")
self.execute_raw(self._sync_and_transform_query(ast.expression, locations).sql(dialect="duckdb"),
catalog_executor)
table = self.get_as_table()
iceberg_table.append(table)
elif table_type.LOCAL:
self.execute_raw(self._sync_and_transform_query(ast, locations).sql(dialect="duckdb"),
catalog_executor)
else:
raise QueryError("Unable to determine table type")
finally:
self._destroy_cache(cache_directory)
elif isinstance(ast, Drop):
delete_table = ast.this
self.catalog.iceberg_catalog.drop_table(delete_table.sql())
Expand All @@ -392,12 +398,11 @@ def execute(self, ast: sqlglot.exp.Expression, catalog_executor: Executor, locat
self.catalog.emulator.execute(ast.sql(dialect="snowflake"))
catalog_executor.catalog.clear_cache()
elif isinstance(ast, Copy):
print("it is a copy")
sql = self._sync_and_transform_query(ast, locations).sql(dialect="duckdb", pretty=True)

insert_into_select_ast = sqlglot.parse_one(sql, dialect='duckdb')
# refactor soon
cache_directory = self.catalog.context.get('cache_directory')

try:
self.execute_raw(sql, catalog_executor, is_raw=isinstance(ast, Copy))
finally:
Expand Down
Loading