Skip to content

Commit

Permalink
style(dags-container): make dags pass flake8 linting rules (#51)
Browse files Browse the repository at this point in the history
  • Loading branch information
JossWhittle authored Apr 8, 2024
1 parent be27a8f commit 19a0f59
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 50 deletions.
63 changes: 29 additions & 34 deletions dags/ingest_csv_to_iceberg2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
import logging
import pendulum
import pyarrow.parquet as pq
import boto3
import json
import s3fs
from random import randint
from airflow import DAG
from airflow.operators.python import get_current_context, task
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.hooks.base import BaseHook
Expand All @@ -18,8 +17,6 @@
from modules.databases.duckdb import s3_csv_to_parquet
from modules.utils.s3 import s3_delete
from modules.utils.s3 import s3_create_bucket
from modules.utils.sql import escape_dataset
from modules.utils.sha1 import sha1
from modules.utils.minioevent import unpack_minio_event
from modules.databases.trino import (
create_schema,
Expand All @@ -32,16 +29,19 @@
validate_s3_key
)


def random_with_N_digits(n):
range_start = 10**(n-1)
range_end = (10**n)-1
return randint(range_start, range_end)


def sha1(value):
sha_1 = hashlib.sha1()
sha_1.update(str(value).encode('utf-8'))
return sha_1.hexdigest()


def pyarrow_to_trino_schema(schema):
trino_schema = []

Expand All @@ -60,7 +60,7 @@ def pyarrow_to_trino_schema(schema):
else:
trino_type = 'INTEGER'
elif field_type.startswith('Float'):
trino_type = 'DOUBLE'
trino_type = 'DOUBLE'
elif field_type.startswith('String'):
trino_type = 'VARCHAR'
elif field_type == 'Object':
Expand All @@ -76,13 +76,11 @@ def pyarrow_to_trino_schema(schema):
else:
# Use VARCHAR as default for unsupported types
trino_type = 'VARCHAR'

# Append field definition to Trino schema
trino_schema.append(f'{field_name} {trino_type}')

return ',\n'.join(trino_schema)


return ',\n'.join(trino_schema)


def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key, dag_id, ingest_delete, debug):
Expand All @@ -97,7 +95,7 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key
base_layer_bucket_dataset_specific = True

append_GUID = False

# dataset is first folder
if dataset == '':
dataset = 'none'
Expand Down Expand Up @@ -132,31 +130,31 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key
########################################################################
logging.info("Validate inputs...")

#debug = conf.get("debug", False)
# debug = conf.get("debug", False)
logging.info(f"debug={debug}")
assert isinstance(debug, bool)

# Path to the data file within the ingest bucket excluding the bucket name
#ingest_key = conf.get("ingest_key", None)
# ingest_key = conf.get("ingest_key", None)
logging.info(f"ingest_key={ingest_key}")
assert (ingest_key is not None) and \
isinstance(ingest_key, str) and \
ingest_key.endswith(".csv")
assert (
(ingest_key is not None) and
isinstance(ingest_key, str) and
ingest_key.endswith(".csv")
)

ingest_path = os.path.dirname(ingest_key)
ingest_file = os.path.basename(ingest_key)
#ingest_bucket = "ingest"
# ingest_bucket = "ingest"

#ingest_delete = conf.get("ingest_delete", False)
# ingest_delete = conf.get("ingest_delete", False)
logging.info(f"ingest_delete={ingest_delete}")
assert isinstance(ingest_delete, bool)


# if using dataset buckets, check we have the target bucket
if base_layer_bucket_dataset_specific:
logging.info(f"Trying to create S3 Bucket : {dataset}")
s3_create_bucket(conn_id="s3_conn",bucket=dataset)

s3_create_bucket(conn_id="s3_conn", bucket=dataset)

ingest = {
"bucket": ingest_bucket,
Expand Down Expand Up @@ -188,8 +186,8 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key
logging.info(f"hive={hive}")
ti.xcom_push("hive", hive)

iceberg_schema = f"iceberg.{dataset}" #"iceberg.ingest"
iceberg_schema = f"iceberg.{dataset}" # "iceberg.ingest"

tablename_ext = ""
if version:
tablename_ext = tablename_ext + f"_{version}"
Expand All @@ -198,13 +196,13 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key
iceberg_table = validate_identifier(f"{iceberg_schema}.{tablename}{tablename_ext}")

iceberg_bucket = base_layer_bucket
iceberg_dir = validate_s3_key(f"{dataset}/{version}")
iceberg_dir = validate_s3_key(f"{dataset}/{version}")
if base_layer_bucket_dataset_specific:
iceberg_bucket = dataset
iceberg_dir = validate_s3_key(f"{version}")

iceberg_path = validate_s3_key(f"{iceberg_bucket}/{iceberg_dir}/{tablename}")

iceberg = {
"schema": iceberg_schema,
"table": iceberg_table,
Expand Down Expand Up @@ -242,7 +240,7 @@ def ingest_csv_to_iceberg(dataset, tablename, version, ingest_bucket, ingest_key
with fs.open(F"s3://{hive_bucket}/{hive_key}", "rb") as fp:
schema: pq.lib.Schema = pq.ParquetFile(fp).schema
trino_schema = pyarrow_to_trino_schema(schema)
#logging.info(f"trino schema={trino_schema}")
# logging.info(f"trino schema={trino_schema}")

########################################################################

Expand Down Expand Up @@ -310,18 +308,15 @@ def process_event(message):
logging.info("Processing message!")
logging.info(f"message={message}")

context = get_current_context()
ti = context['ti']

event = unpack_minio_event(message)
logging.info(f"event={event}")

ingest_csv_to_iceberg(dataset=event['dir_name'],
tablename=event['filename'],
version="20",
ingest_csv_to_iceberg(dataset=event['dir_name'],
tablename=event['filename'],
version="20",
ingest_bucket=event['bucket'],
ingest_key=event['src_file_path'],
dag_id=event['etag']+str(random_with_N_digits(4)),
ingest_key=event['src_file_path'],
dag_id=event['etag']+str(random_with_N_digits(4)),
ingest_delete=False,
debug=True)

Expand Down
4 changes: 2 additions & 2 deletions dags/modules/databases/duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ def s3_csv_to_parquet(conn_id: str, src_bucket: str, dst_bucket: str, src_key: s
.replace("http://", "").replace("https://", "")

# original duckdb
#con = duckdb.connect(database=':memory:')
# con = duckdb.connect(database=':memory:')

# try giving it some disk space ?
# try giving it some disk space ?
db_path = '/tmp/database.db'
con = duckdb.connect(database=db_path)

Expand Down
4 changes: 2 additions & 2 deletions dags/modules/utils/minioevent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
def unpack_minio_event(message):
import json

message_json = json.loads(message)

records = message_json["Records"][0]
Expand Down Expand Up @@ -31,4 +31,4 @@ def unpack_minio_event(message):
full_file_path=full_file_path,
head_path=head_path,
filename=filename
)
)
2 changes: 1 addition & 1 deletion dags/modules/utils/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ def s3_delete(conn_id: str, bucket, key):

s3_get_resource(conn_id).Object(bucket, key).delete()


def s3_create_bucket(conn_id: str, bucket):
try:
s3_get_resource(conn_id).create_bucket(Bucket=bucket.lower())
except Exception as e:
print(f"An error occurred while creating the S3 bucket: {e}")

13 changes: 2 additions & 11 deletions dags/register_minio_objects.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import datetime
import hashlib
import os.path
import logging
import pendulum
import psycopg2
import boto3
import json
from airflow import DAG
from airflow.operators.python import get_current_context, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.trigger_rule import TriggerRule
Expand All @@ -33,16 +28,13 @@ def process_event(message):
logging.info("Processing message!")
logging.info(f"message={message}")

context = get_current_context()
ti = context['ti']

event = unpack_minio_event(message)
logging.info(f"event={event}")

#Register eTag in postgres if not already there
# Register eTag in postgres if not already there

postgres_conn = BaseHook.get_connection('pg_conn')

# Establish connection to PostgreSQL
conn = psycopg2.connect(
dbname=postgres_conn.schema,
Expand All @@ -62,7 +54,6 @@ def process_event(message):
cur.close()
conn.close()


consume_events = RabbitMQPythonOperator(
func=process_event,
task_id="consume_events",
Expand Down

0 comments on commit 19a0f59

Please sign in to comment.