forked from MariuszKu/analiza-duckdb-mieszkania
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclean.py
86 lines (75 loc) · 3.43 KB
/
clean.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import pandas as pd
import duckdb
import gcsfs
from utils import convert_to_last_day_of_quarter, generate_days_in_years
import env
def clean_flats():
flats = pd.read_excel(f"{env.LINK}flat_prices.xlsx", header=6, usecols="X:AO", sheet_name="Rynek pierwotny")
cities = ['Białystok','Bydgoszcz','Gdańsk','Gdynia','Katowice','Kielce','Kraków','Lublin','Łódź','Olsztyn','Opole',
'Poznań','Rzeszów','Szczecin','Warszawa','Wrocław','Zielona Góra']
flats.columns = flats.columns.str.replace(".1", "")
flats.columns = flats.columns.str.replace("*", "")
print(flats.head())
flats = flats[flats['Kwartał'].notna()]
flats_unpivot = pd.melt(flats, id_vars='Kwartał', value_vars=cities)
flats_unpivot['date'] = flats_unpivot.apply(lambda row: convert_to_last_day_of_quarter(row['Kwartał']),axis=1)
flats_unpivot['date'] = pd.to_datetime(flats_unpivot['date'])
flats_unpivot['city'] = flats_unpivot['variable']
flats_unpivot.to_parquet(f"{env.LINK}flats_price.parquet")
def clean_salary():
if "gs://" in env.LINK:
fs = gcsfs.GCSFileSystem()
fs.put("data/salary.csv",f"{env.LINK}salary.csv")
salary = pd.read_csv(f"{env.LINK}salary.csv")
salary['salary'] = salary['salary'].str.replace(" ","")
salary['date'] = salary.apply(lambda row: convert_to_last_day_of_quarter(row['qt'], False),axis=1)
salary['date'] = pd.to_datetime(salary['date'])
salary.to_parquet(f"{env.LINK}salary.parquet")
def clean_m1():
pass
# m1 = pd.read_csv(f"{env.LINK}m1.csv")
# m1['value'] = m1['value'].str.replace(" ","")
# m1['date'] = m1.apply(lambda row: convert_to_last_day_of_quarter(row['date'], False),axis=1)
# m1['date'] = pd.to_datetime(m1['date'])
# m1.to_parquet("data/m1.parquet")
def clean_currency():
gold = pd.read_csv(f"{env.LINK}gold.csv", names=["date","price"], header=None)
gold2013 = pd.read_csv(f"{env.LINK}gold_2006_2012.csv", names=["date","price"], header=None)
gold['date'] = pd.to_datetime(gold['date'])
gold2013['date'] = pd.to_datetime(gold2013['date'])
gold['currency'] = 'gold'
gold2013['currency'] = 'gold'
calendar = pd.DataFrame(generate_days_in_years(2006,2023), columns=["date","last_date"])
calendar['date'] = pd.to_datetime(calendar['date'])
usd = pd.read_csv(f"{env.LINK}usd.csv", names=["date","price"], header=None)
usd['date'] = pd.to_datetime(usd['date'])
usd['currency'] = 'usd'
currency = pd.concat([gold, gold2013, usd], ignore_index=True, sort=False)
# fill gups
usd = duckdb.sql("""
select
date,
price,
currency,
from
(
select
row_number() over (partition by currency, a.date order by b.date desc) lp,
a.date,
b.date org_date,
b.price,
currency,
from
calendar a left join currency b on b.date between a.date - INTERVAL 3 DAY and a.date
)
WHERE
lp = 1
order by date
""").to_df()
usd.to_parquet(f"{env.LINK}currency.parquet")
if __name__ == "__main__":
clean_flats()
clean_currency()
clean_m1()
clean_currency()
clean_salary()