diff --git a/databricks_scripts/extract.py b/databricks_scripts/extract.py index 1e5662f..9774ae3 100644 --- a/databricks_scripts/extract.py +++ b/databricks_scripts/extract.py @@ -23,8 +23,8 @@ def do_extract(): log_tests("Extracting data and saving...") extract( - "https://data.cityofnewyork.us/resource/c3uy-2p5r.csv?$limit=200000", - "air_quality.csv", + url="https://data.cityofnewyork.us/resource/c3uy-2p5r.csv?$limit=200000", + file_name="air_quality.csv", on_databricks=True, ) diff --git a/databricks_scripts/query.py b/databricks_scripts/query.py index 9a31895..bdd6778 100644 --- a/databricks_scripts/query.py +++ b/databricks_scripts/query.py @@ -30,7 +30,7 @@ def query(on_databricks=False): save_output(result_df.toPandas().to_markdown()) result_df.write.csv( ( - "/Workspace/Workspace/Shared/Leonard_Eshun_Mini_Project_Eleven/data/" + "/Workspace/Workspace/Shared/Leonard_Eshun_Mini_Project_Eleven/data" if on_databricks else "./Aggregation_Query_Result" ), @@ -40,4 +40,4 @@ def query(on_databricks=False): if __name__ == "__main__": - query() + query(True) diff --git a/my_lib/extract_data.py b/my_lib/extract_data.py index 308acfa..becd83a 100644 --- a/my_lib/extract_data.py +++ b/my_lib/extract_data.py @@ -10,8 +10,8 @@ def extract(url: str, file_name: str, on_databricks=False): file_path = ( "/Workspace/Workspace/Shared/Leonard_Eshun_Mini_Project_Eleven/data/" if on_databricks - else "./data/" + file_name - ) + else "./data/" + ) + file_name with requests.get(url) as r: with open(file_path, "wb") as f: f.write(r.content) diff --git a/my_lib/lib.py b/my_lib/lib.py index f014bb8..876ca4c 100644 --- a/my_lib/lib.py +++ b/my_lib/lib.py @@ -1,5 +1,9 @@ -from my_lib.util import log_tests -from my_lib.load import static_data +import sys + +sys.path.append("/Workspace/Workspace/Shared/Leonard_Eshun_Mini_Project_Eleven/my_lib/") + +from util import log_tests +from load import static_data from pyspark.sql import SparkSession from pyspark.sql.types import ( StringType, diff --git a/my_lib/load.py b/my_lib/load.py index 4865567..2a80ac1 100644 --- a/my_lib/load.py +++ b/my_lib/load.py @@ -13,7 +13,12 @@ FloatType, ) import csv -from my_lib.util import log_tests + +import sys + +sys.path.append("/Workspace/Workspace/Shared/Leonard_Eshun_Mini_Project_Eleven/my_lib/") + +from util import log_tests class static_data: @@ -121,16 +126,64 @@ def transform_n_load( spark.createDataFrame(value, static_data.spark_dataframes[key].columns) ) + # Do columns again for int and float they complain and cause merge errors + static_data.spark_dataframes["geo_data"] = static_data.spark_dataframes[ + "geo_data" + ].withColumn( + "geo_id", static_data.spark_dataframes["geo_data"]["geo_id"].cast(IntegerType()) + ) + + static_data.spark_dataframes["air_quality"] = static_data.spark_dataframes[ + "air_quality" + ].withColumn( + "fn_indicator_id", + static_data.spark_dataframes["air_quality"]["fn_indicator_id"].cast( + IntegerType() + ), + ) + + static_data.spark_dataframes["air_quality"] = static_data.spark_dataframes[ + "air_quality" + ].withColumn( + "air_quality_id", + static_data.spark_dataframes["air_quality"]["air_quality_id"].cast( + IntegerType() + ), + ) + + static_data.spark_dataframes["air_quality"] = static_data.spark_dataframes[ + "air_quality" + ].withColumn( + "fn_geo_id", + static_data.spark_dataframes["air_quality"]["fn_geo_id"].cast(IntegerType()), + ) + + static_data.spark_dataframes["air_quality"] = static_data.spark_dataframes[ + "air_quality" + ].withColumn( + "data_value", + static_data.spark_dataframes["air_quality"]["data_value"].cast(FloatType()), + ) + + # Saving a table name of indicator will fail + static_data.spark_dataframes["indicator"] = static_data.spark_dataframes[ + "indicator" + ].withColumn( + "indicator_id", + static_data.spark_dataframes["indicator"]["indicator_id"].cast(IntegerType()), + ) + # save data - static_data.spark_dataframes["indicator"].write.format("delta").mode( - "overwrite" - ).saveAsTable("indicator") + static_data.spark_dataframes["geo_data"].write.format("delta").mode( "overwrite" - ).saveAsTable("geo_data") + ).saveAsTable("tb_geo_data") static_data.spark_dataframes["air_quality"].write.format("delta").mode( "overwrite" - ).saveAsTable("air_quality") + ).saveAsTable("tb_air_quality") + static_data.spark_dataframes["indicator"].write.format("delta").mode( + "overwrite" + ).saveAsTable("tb_indicator") static_data.spark_dataframes["indicator"].show() static_data.spark_dataframes["geo_data"].show()