Skip to content

Commit

Permalink
Merge pull request #44 from luftdaten-at/37-import-sensorcomunity-dat…
Browse files Browse the repository at this point in the history
…a-from-archive

37 import sensorcomunity data from archive
  • Loading branch information
n11ik authored Dec 2, 2024
2 parents 9dda217 + e20118b commit c0be4cb
Show file tree
Hide file tree
Showing 10 changed files with 380 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,8 @@ cython_debug/

# docker compose production environment
docker-compose.prod.yml
# import script
code/sensor_community_archive/csv
code/sensor_community_archive/log.txt
code/sensor_community_archive/progress.txt
code/sensor_community_archive/download_list.txt
4 changes: 2 additions & 2 deletions code/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker, scoped_session

import os

Expand All @@ -15,7 +15,7 @@

# Engine und Session erstellen
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
SessionLocal = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))

# Basis-Model für die Deklaration der Datenbanktabellen
Base = declarative_base()
Expand Down
151 changes: 151 additions & 0 deletions code/download_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import requests
import re
import gzip
import os
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from enums import SensorModel
from tqdm import tqdm
from database import get_db
from models import Station


# Folder where csv files are stored
DOWNLOAD_FOLDER = "sensor_community_archive/csv"
# url from where to download data
URL = "https://archive.sensor.community/"
# regex pattern for a day
PATTERN_DAY = r"\d\d\d\d-\d\d-\d\d/"
# regex pattern for a year
PATTERN_YEAR = r"\d\d\d\d/"
#
PATTERN_STATION_ID = r"sensor_(\d+)"
# a list of all url of csv files that should be imported
# the list was generated by calling list_website(URL)
DOWNLOAD_LIST = "sensor_community_archive/download_list.txt"
# shows how many files have been downloaded
PROGRESS_FILE = "sensor_community_archive/progress.txt"
# file where logs are saved
LOG_FILE = "sensor_community_archive/log.txt"

all_csv_urls = []
log_file = None


def log(*l):
"""
simple logging
"""
print(' '.join(str(x) for x in l), file=log_file)


def download(url, trys = 5):
"""
Downloads a file from the given URL, extracts if .csv.gz, and saves as .csv.
"""
csv_filename = url.split("/")[-1].removesuffix('.gz')
raw = None
for _ in range(trys):
try:
response = requests.get(url, stream=True)
response.raise_for_status()
raw = response.raw
break
except Exception:
continue

if not raw:
log(f'Faild to download File: {csv_filename}')
return

with open(os.path.join(DOWNLOAD_FOLDER, csv_filename), 'wb') as csv_file:
file_content = gzip.GzipFile(fileobj=raw) if url.endswith('.gz') else raw
csv_file.write(file_content.read())

return csv_filename


def list_website(url, trys = 5):
"""
recursively finds all csv files and saves them to download_list.txt
"""

page = None

for _ in range(trys):
try:
response = requests.get(url)
response.raise_for_status()
page = response.text
break
except Exception:
continue

if not page:
log(f'Faild to list: {url}')
return

soup = BeautifulSoup(page, "html.parser")

for item in reversed(soup.find_all("a")):
link = item.get("href")
if re.fullmatch(PATTERN_DAY, link):
if not list_website(urljoin(url, link)):
return False
if re.fullmatch(PATTERN_YEAR, link):
if not list_website(urljoin(url, link)):
return False
if link.endswith(".csv") or link.endswith(".gz"):
for sensor_name in SensorModel._names.values():
if sensor_name.lower() in link:
break
else:
continue
csv_url = urljoin(url, link)
if csv_url in all_csv_urls:
return False
print(csv_url, file=open(DOWNLOAD_LIST, 'a'))
return True


def main():
"""
1. update download list with all url newer than the newst in the download list
2. download missing files
"""
global all_csv_urls

all_csv_urls = set(line.strip() for line in open(DOWNLOAD_LIST, "r").readlines())
# append download list
list_website(URL)

# files that have already been downloaded
cur_files = set(os.listdir(DOWNLOAD_FOLDER))
db = next(get_db())
stations = set(str(s.device) for s in db.query(Station).all())

urls_to_download = [url.strip() for url in open(DOWNLOAD_LIST, "r").readlines()]
urls_to_download.sort(reverse=True)

for url in tqdm(urls_to_download, desc="Downloading files", unit="files", file=open(PROGRESS_FILE, "w")):
file_name = url.split("/")[-1]
station_id = re.findall(PATTERN_STATION_ID, url)[0]

if station_id not in stations:
log(f"Skipp {station_id}")
continue
if file_name in cur_files:
log(f"Already downloaded: {file_name}")
break
download(url)


if __name__ == '__main__':
log_file = open(LOG_FILE, 'w')
try:
main()
except Exception as e:
import traceback
s = traceback.format_exc()
log(s)
log_file.close()
19 changes: 19 additions & 0 deletions code/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ class Dimension():
NO2: "no2_ppb",
}

_sensor_community_names_import = {
PM0_1: "P01",
PM1_0: "P10",
PM2_5: "P2",
PM4_0: "P4",
PM10_0: "P1",
HUMIDITY: "humidity",
TEMPERATURE: "temperature",
PRESSURE: "pressure",
CO2: "co2_ppm",
O3: "ozone_ppb",
TVOC: "tvoc",
NO2: "no2_ppb",
}

@classmethod
def get_unit(cls, dimension_id: int) -> str:
"""
Expand All @@ -97,6 +112,10 @@ def get_name(cls, dimension_id: int) -> str:
@classmethod
def get_dimension_from_sensor_community_name(cls, sensor_community_name: str):
return {v:k for k, v in cls._sensor_community_names.items()}.get(sensor_community_name, None)

@classmethod
def get_dimension_from_sensor_community_name_import(cls, sensor_community_name: str):
return {v:k for k, v in cls._sensor_community_names_import.items()}.get(sensor_community_name, None)


class SensorModel():
Expand Down
148 changes: 148 additions & 0 deletions code/import_from_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import io
import pandas as pd
import os
from tqdm import tqdm
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from models import *
from enums import SensorModel, Dimension


DOWNLOAD_FOLDER = "sensor_community_archive/csv"
LOG_FILE = "sensor_community_archive/log.txt"
PROGRESS_FILE = "sensor_community_archive/progress.txt"

log_file = None

# INIT DB
# Umgebungsvariablen auslesen
DB_USER = os.getenv("POSTGRES_USER", "")
DB_PASS = os.getenv("POSTGRES_PASSWORD", "")
DB_HOST = os.getenv("DB_HOST", "")
DB_NAME = os.getenv("POSTGRES_DB", "")

# Erstellen der korrekten DATABASE_URL mit f-String
DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}"
engine = create_engine(DATABASE_URL)
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))


def log(*l):
"""
simple logging
"""
print(' '.join(str(x) for x in l), file=log_file)


def import_sensor_community_archive_from_csv(csv_file_path: str):
"""
sensor_id;sensor_type;location;lat;lon;timestamp;pressure;altitude;pressure_sealevel;temperature
"""
db = db_session()
df = pd.read_csv(csv_file_path, encoding='utf8', sep=";")

for row in df.iterrows():
# check if sensor_id in database
idx, data = row
device = str(data['sensor_id'])
time_measured = datetime.fromisoformat(data['timestamp'])
sensor_model = {v: k for k, v in SensorModel._names.items()}.get(data['sensor_type'], None)
db_station = db.query(Station).filter(Station.device == device).first()

if not db_station or not sensor_model:
continue

m = (
db.query(Measurement)
.filter(
Measurement.station_id == db_station.id,
Measurement.time_measured == time_measured,
Measurement.sensor_model == sensor_model
)
.first()
)

# if measurement is already present skip
if m:
continue

db_measurement = Measurement(
sensor_model=sensor_model,
station_id=db_station.id,
time_measured=time_measured,
time_received=None,
location_id=db_station.location_id
)

db.add(db_measurement)
db.commit()
db.refresh(db_measurement)

#log(f"Created measurement: {vars(db_measurement)}")

for dim_name, val in list(data.items())[6:]:
dim = Dimension.get_dimension_from_sensor_community_name_import(dim_name)
try:
val = float(val)
except ValueError:
#log(f"Value is not a float: {val}")
continue
if not dim:
continue
if val == float('nan'):
continue

db_value = Values(
dimension=dim,
value=float(val),
measurement_id=db_measurement.id
)
db.add(db_value)
#log(f"Added value: {vars(db_value)}")

db.commit()
db.close()

# singel thread
'''
def main():
# List all files in the download folder and process them
for filename in tqdm(os.listdir(DOWNLOAD_FOLDER), desc="Import CSV files", unit="Files", file=open(PROGRESS_FILE, "w")):
file_path = os.path.join(DOWNLOAD_FOLDER, filename)
# Ensure it's a file (not a directory)
if os.path.isfile(file_path):
# Read the file content as a string
import_sensor_community_archive_from_csv(file_path)
'''

# multi thread
def main():
# List all files in the download folder
files = [
os.path.join(DOWNLOAD_FOLDER, filename)
for filename in os.listdir(DOWNLOAD_FOLDER)
if os.path.isfile(os.path.join(DOWNLOAD_FOLDER, filename))
]

# Progress tracking with tqdm
with open(PROGRESS_FILE, "w") as progress_file:
with tqdm(total=len(files), desc="Import CSV files", unit="Files", file=progress_file) as pbar:
# ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor() as executor:
# Define a function to update the progress bar after each task
def process_file(file_path):
import_sensor_community_archive_from_csv(file_path)
pbar.update(1)

# Submit all tasks to the executor
for file_path in files:
executor.submit(process_file, file_path)


if __name__ == "__main__":
log_file = open(LOG_FILE, 'w')
main()
log_file.close()
3 changes: 2 additions & 1 deletion code/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@
scheduler = BackgroundScheduler()

# Planen Sie die Aufgabe alle 5 Minuten
#scheduler.add_job(import_sensor_community_data, 'interval', minutes=5)
import_sensor_community_data()
scheduler.add_job(import_sensor_community_data, 'interval', minutes=5)

# Scheduler starten
scheduler.start()
Expand Down
Loading

0 comments on commit c0be4cb

Please sign in to comment.