Skip to content

Commit

Permalink
etl
Browse files Browse the repository at this point in the history
  • Loading branch information
siliconshells committed Dec 14, 2024
1 parent 5457992 commit 8b8960d
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 14 deletions.
4 changes: 2 additions & 2 deletions databricks_scripts/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ def do_extract():

log_tests("Extracting data and saving...")
extract(
"https://data.cityofnewyork.us/resource/c3uy-2p5r.csv?$limit=200000",
"air_quality.csv",
url="https://data.cityofnewyork.us/resource/c3uy-2p5r.csv?$limit=200000",
file_name="air_quality.csv",
on_databricks=True,
)

Expand Down
4 changes: 2 additions & 2 deletions databricks_scripts/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def query(on_databricks=False):
save_output(result_df.toPandas().to_markdown())
result_df.write.csv(
(
"/Workspace/Workspace/Shared/Leonard_Eshun_Mini_Project_Eleven/data/"
"/Workspace/Workspace/Shared/Leonard_Eshun_Mini_Project_Eleven/data"
if on_databricks
else "./Aggregation_Query_Result"
),
Expand All @@ -40,4 +40,4 @@ def query(on_databricks=False):


if __name__ == "__main__":
query()
query(True)
4 changes: 2 additions & 2 deletions my_lib/extract_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ def extract(url: str, file_name: str, on_databricks=False):
file_path = (
"/Workspace/Workspace/Shared/Leonard_Eshun_Mini_Project_Eleven/data/"
if on_databricks
else "./data/" + file_name
)
else "./data/"
) + file_name
with requests.get(url) as r:
with open(file_path, "wb") as f:
f.write(r.content)
Expand Down
8 changes: 6 additions & 2 deletions my_lib/lib.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from my_lib.util import log_tests
from my_lib.load import static_data
import sys

sys.path.append("/Workspace/Workspace/Shared/Leonard_Eshun_Mini_Project_Eleven/my_lib/")

from util import log_tests
from load import static_data
from pyspark.sql import SparkSession
from pyspark.sql.types import (
StringType,
Expand Down
65 changes: 59 additions & 6 deletions my_lib/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
FloatType,
)
import csv
from my_lib.util import log_tests

import sys

sys.path.append("/Workspace/Workspace/Shared/Leonard_Eshun_Mini_Project_Eleven/my_lib/")

from util import log_tests


class static_data:
Expand Down Expand Up @@ -121,16 +126,64 @@ def transform_n_load(
spark.createDataFrame(value, static_data.spark_dataframes[key].columns)
)

# Do columns again for int and float they complain and cause merge errors
static_data.spark_dataframes["geo_data"] = static_data.spark_dataframes[
"geo_data"
].withColumn(
"geo_id", static_data.spark_dataframes["geo_data"]["geo_id"].cast(IntegerType())
)

static_data.spark_dataframes["air_quality"] = static_data.spark_dataframes[
"air_quality"
].withColumn(
"fn_indicator_id",
static_data.spark_dataframes["air_quality"]["fn_indicator_id"].cast(
IntegerType()
),
)

static_data.spark_dataframes["air_quality"] = static_data.spark_dataframes[
"air_quality"
].withColumn(
"air_quality_id",
static_data.spark_dataframes["air_quality"]["air_quality_id"].cast(
IntegerType()
),
)

static_data.spark_dataframes["air_quality"] = static_data.spark_dataframes[
"air_quality"
].withColumn(
"fn_geo_id",
static_data.spark_dataframes["air_quality"]["fn_geo_id"].cast(IntegerType()),
)

static_data.spark_dataframes["air_quality"] = static_data.spark_dataframes[
"air_quality"
].withColumn(
"data_value",
static_data.spark_dataframes["air_quality"]["data_value"].cast(FloatType()),
)

# Saving a table name of indicator will fail
static_data.spark_dataframes["indicator"] = static_data.spark_dataframes[
"indicator"
].withColumn(
"indicator_id",
static_data.spark_dataframes["indicator"]["indicator_id"].cast(IntegerType()),
)

# save data
static_data.spark_dataframes["indicator"].write.format("delta").mode(
"overwrite"
).saveAsTable("indicator")

static_data.spark_dataframes["geo_data"].write.format("delta").mode(
"overwrite"
).saveAsTable("geo_data")
).saveAsTable("tb_geo_data")
static_data.spark_dataframes["air_quality"].write.format("delta").mode(
"overwrite"
).saveAsTable("air_quality")
).saveAsTable("tb_air_quality")
static_data.spark_dataframes["indicator"].write.format("delta").mode(
"overwrite"
).saveAsTable("tb_indicator")

static_data.spark_dataframes["indicator"].show()
static_data.spark_dataframes["geo_data"].show()
Expand Down

0 comments on commit 8b8960d

Please sign in to comment.