diff --git a/dags/ingest_csv_to_iceberg2.py b/dags/ingest_csv_to_iceberg2.py index 4daade86..c98a5b80 100644 --- a/dags/ingest_csv_to_iceberg2.py +++ b/dags/ingest_csv_to_iceberg2.py @@ -4,12 +4,11 @@ import logging import pendulum import pyarrow.parquet as pq -import boto3 import json import s3fs from random import randint from airflow import DAG -from airflow.operators.python import get_current_context, task +from airflow.operators.python import get_current_context from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.utils.trigger_rule import TriggerRule from airflow.hooks.base import BaseHook @@ -18,8 +17,6 @@ from modules.databases.duckdb import s3_csv_to_parquet from modules.utils.s3 import s3_delete from modules.utils.s3 import s3_create_bucket -from modules.utils.sql import escape_dataset -from modules.utils.sha1 import sha1 from modules.utils.minioevent import unpack_minio_event from modules.databases.trino import ( create_schema, @@ -32,16 +29,19 @@ validate_s3_key ) + def random_with_N_digits(n): range_start = 10**(n-1) range_end = (10**n)-1 return randint(range_start, range_end) + def sha1(value): sha_1 = hashlib.sha1() sha_1.update(str(value).encode('utf-8')) return sha_1.hexdigest() + def pyarrow_to_trino_schema(schema): trino_schema = [] @@ -60,7 +60,7 @@ def pyarrow_to_trino_schema(schema): else: trino_type = 'INTEGER' elif field_type.startswith('Float'): - trino_type = 'DOUBLE' + trino_type = 'DOUBLE' elif field_type.startswith('String'): trino_type = 'VARCHAR' elif field_type == 'Object': @@ -76,13 +76,11 @@ def pyarrow_to_trino_schema(schema): else: # Use VARCHAR as default for unsupported types trino_type = 'VARCHAR' - + # Append field definition to Trino schema trino_schema.append(f'{field_name} {trino_type}') - - return ',\n'.join(trino_schema) - + return ',\n'.join(trino_schema) def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key, dag_id, ingest_delete, debug): @@ -97,7 +95,7 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key base_layer_bucket_dataset_specific = True append_GUID = False - + # dataset is first folder if dataset == '': dataset = 'none' @@ -132,31 +130,31 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key ######################################################################## logging.info("Validate inputs...") - #debug = conf.get("debug", False) + # debug = conf.get("debug", False) logging.info(f"debug={debug}") assert isinstance(debug, bool) # Path to the data file within the ingest bucket excluding the bucket name - #ingest_key = conf.get("ingest_key", None) + # ingest_key = conf.get("ingest_key", None) logging.info(f"ingest_key={ingest_key}") - assert (ingest_key is not None) and \ - isinstance(ingest_key, str) and \ - ingest_key.endswith(".csv") + assert ( + (ingest_key is not None) and + isinstance(ingest_key, str) and + ingest_key.endswith(".csv") + ) ingest_path = os.path.dirname(ingest_key) ingest_file = os.path.basename(ingest_key) - #ingest_bucket = "ingest" + # ingest_bucket = "ingest" - #ingest_delete = conf.get("ingest_delete", False) + # ingest_delete = conf.get("ingest_delete", False) logging.info(f"ingest_delete={ingest_delete}") assert isinstance(ingest_delete, bool) - # if using dataset buckets, check we have the target bucket if base_layer_bucket_dataset_specific: logging.info(f"Trying to create S3 Bucket : {dataset}") - s3_create_bucket(conn_id="s3_conn",bucket=dataset) - + s3_create_bucket(conn_id="s3_conn", bucket=dataset) ingest = { "bucket": ingest_bucket, @@ -188,8 +186,8 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key logging.info(f"hive={hive}") ti.xcom_push("hive", hive) - iceberg_schema = f"iceberg.{dataset}" #"iceberg.ingest" - + iceberg_schema = f"iceberg.{dataset}" # "iceberg.ingest" + tablename_ext = "" if version: tablename_ext = tablename_ext + f"_{version}" @@ -198,13 +196,13 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key iceberg_table = validate_identifier(f"{iceberg_schema}.{tablename}{tablename_ext}") iceberg_bucket = base_layer_bucket - iceberg_dir = validate_s3_key(f"{dataset}/{version}") + iceberg_dir = validate_s3_key(f"{dataset}/{version}") if base_layer_bucket_dataset_specific: iceberg_bucket = dataset iceberg_dir = validate_s3_key(f"{version}") - + iceberg_path = validate_s3_key(f"{iceberg_bucket}/{iceberg_dir}/{tablename}") - + iceberg = { "schema": iceberg_schema, "table": iceberg_table, @@ -242,7 +240,7 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key with fs.open(F"s3://{hive_bucket}/{hive_key}", "rb") as fp: schema: pq.lib.Schema = pq.ParquetFile(fp).schema trino_schema = pyarrow_to_trino_schema(schema) - #logging.info(f"trino schema={trino_schema}") + # logging.info(f"trino schema={trino_schema}") ######################################################################## @@ -310,18 +308,15 @@ def process_event(message): logging.info("Processing message!") logging.info(f"message={message}") - context = get_current_context() - ti = context['ti'] - event = unpack_minio_event(message) logging.info(f"event={event}") - ingest_csv_to_iceberg(dataset=event['dir_name'], - tablename=event['filename'], - version="20", + ingest_csv_to_iceberg(dataset=event['dir_name'], + tablename=event['filename'], + version="20", ingest_bucket=event['bucket'], - ingest_key=event['src_file_path'], - dag_id=event['etag']+str(random_with_N_digits(4)), + ingest_key=event['src_file_path'], + dag_id=event['etag']+str(random_with_N_digits(4)), ingest_delete=False, debug=True) diff --git a/dags/modules/databases/duckdb.py b/dags/modules/databases/duckdb.py index 6ee01ec3..f18239ee 100644 --- a/dags/modules/databases/duckdb.py +++ b/dags/modules/databases/duckdb.py @@ -23,9 +23,9 @@ def s3_csv_to_parquet(conn_id: str, src_bucket: str, dst_bucket: str, src_key: s .replace("http://", "").replace("https://", "") # original duckdb - #con = duckdb.connect(database=':memory:') + # con = duckdb.connect(database=':memory:') - # try giving it some disk space ? + # try giving it some disk space ? db_path = '/tmp/database.db' con = duckdb.connect(database=db_path) diff --git a/dags/modules/utils/minioevent.py b/dags/modules/utils/minioevent.py index 42e486c7..6d957bf1 100644 --- a/dags/modules/utils/minioevent.py +++ b/dags/modules/utils/minioevent.py @@ -1,6 +1,6 @@ def unpack_minio_event(message): import json - + message_json = json.loads(message) records = message_json["Records"][0] @@ -31,4 +31,4 @@ def unpack_minio_event(message): full_file_path=full_file_path, head_path=head_path, filename=filename - ) \ No newline at end of file + ) diff --git a/dags/modules/utils/s3.py b/dags/modules/utils/s3.py index 7bd3ecd5..c7ae460a 100644 --- a/dags/modules/utils/s3.py +++ b/dags/modules/utils/s3.py @@ -58,9 +58,9 @@ def s3_delete(conn_id: str, bucket, key): s3_get_resource(conn_id).Object(bucket, key).delete() + def s3_create_bucket(conn_id: str, bucket): try: s3_get_resource(conn_id).create_bucket(Bucket=bucket.lower()) except Exception as e: print(f"An error occurred while creating the S3 bucket: {e}") - diff --git a/dags/register_minio_objects.py b/dags/register_minio_objects.py index 2811e0e4..739cfd4a 100644 --- a/dags/register_minio_objects.py +++ b/dags/register_minio_objects.py @@ -1,13 +1,8 @@ import datetime -import hashlib -import os.path import logging import pendulum import psycopg2 -import boto3 -import json from airflow import DAG -from airflow.operators.python import get_current_context, task from airflow.operators.trigger_dagrun import TriggerDagRunOperator from airflow.operators.postgres_operator import PostgresOperator from airflow.utils.trigger_rule import TriggerRule @@ -33,16 +28,13 @@ def process_event(message): logging.info("Processing message!") logging.info(f"message={message}") - context = get_current_context() - ti = context['ti'] - event = unpack_minio_event(message) logging.info(f"event={event}") - #Register eTag in postgres if not already there + # Register eTag in postgres if not already there postgres_conn = BaseHook.get_connection('pg_conn') - + # Establish connection to PostgreSQL conn = psycopg2.connect( dbname=postgres_conn.schema, @@ -62,7 +54,6 @@ def process_event(message): cur.close() conn.close() - consume_events = RabbitMQPythonOperator( func=process_event, task_id="consume_events",