diff --git a/.gitignore b/.gitignore index 08373f9..402efe3 100644 --- a/.gitignore +++ b/.gitignore @@ -155,3 +155,8 @@ cython_debug/ # docker compose production environment docker-compose.prod.yml +# import script +code/sensor_community_archive/csv +code/sensor_community_archive/log.txt +code/sensor_community_archive/progress.txt +code/sensor_community_archive/download_list.txt \ No newline at end of file diff --git a/code/database.py b/code/database.py index d521382..63f02f8 100644 --- a/code/database.py +++ b/code/database.py @@ -1,6 +1,6 @@ from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker +from sqlalchemy.orm import sessionmaker, scoped_session import os @@ -15,7 +15,7 @@ # Engine und Session erstellen engine = create_engine(DATABASE_URL) -SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) +SessionLocal = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine)) # Basis-Model für die Deklaration der Datenbanktabellen Base = declarative_base() diff --git a/code/download_csv.py b/code/download_csv.py new file mode 100644 index 0000000..cb6f194 --- /dev/null +++ b/code/download_csv.py @@ -0,0 +1,151 @@ +import requests +import re +import gzip +import os +from urllib.parse import urljoin +from bs4 import BeautifulSoup +from enums import SensorModel +from tqdm import tqdm +from database import get_db +from models import Station + + +# Folder where csv files are stored +DOWNLOAD_FOLDER = "sensor_community_archive/csv" +# url from where to download data +URL = "https://archive.sensor.community/" +# regex pattern for a day +PATTERN_DAY = r"\d\d\d\d-\d\d-\d\d/" +# regex pattern for a year +PATTERN_YEAR = r"\d\d\d\d/" +# +PATTERN_STATION_ID = r"sensor_(\d+)" +# a list of all url of csv files that should be imported +# the list was generated by calling list_website(URL) +DOWNLOAD_LIST = "sensor_community_archive/download_list.txt" +# shows how many files have been downloaded +PROGRESS_FILE = "sensor_community_archive/progress.txt" +# file where logs are saved +LOG_FILE = "sensor_community_archive/log.txt" + +all_csv_urls = [] +log_file = None + + +def log(*l): + """ + simple logging + """ + print(' '.join(str(x) for x in l), file=log_file) + + +def download(url, trys = 5): + """ + Downloads a file from the given URL, extracts if .csv.gz, and saves as .csv. + """ + csv_filename = url.split("/")[-1].removesuffix('.gz') + raw = None + for _ in range(trys): + try: + response = requests.get(url, stream=True) + response.raise_for_status() + raw = response.raw + break + except Exception: + continue + + if not raw: + log(f'Faild to download File: {csv_filename}') + return + + with open(os.path.join(DOWNLOAD_FOLDER, csv_filename), 'wb') as csv_file: + file_content = gzip.GzipFile(fileobj=raw) if url.endswith('.gz') else raw + csv_file.write(file_content.read()) + + return csv_filename + + +def list_website(url, trys = 5): + """ + recursively finds all csv files and saves them to download_list.txt + """ + + page = None + + for _ in range(trys): + try: + response = requests.get(url) + response.raise_for_status() + page = response.text + break + except Exception: + continue + + if not page: + log(f'Faild to list: {url}') + return + + soup = BeautifulSoup(page, "html.parser") + + for item in reversed(soup.find_all("a")): + link = item.get("href") + if re.fullmatch(PATTERN_DAY, link): + if not list_website(urljoin(url, link)): + return False + if re.fullmatch(PATTERN_YEAR, link): + if not list_website(urljoin(url, link)): + return False + if link.endswith(".csv") or link.endswith(".gz"): + for sensor_name in SensorModel._names.values(): + if sensor_name.lower() in link: + break + else: + continue + csv_url = urljoin(url, link) + if csv_url in all_csv_urls: + return False + print(csv_url, file=open(DOWNLOAD_LIST, 'a')) + return True + + +def main(): + """ + 1. update download list with all url newer than the newst in the download list + 2. download missing files + """ + global all_csv_urls + + all_csv_urls = set(line.strip() for line in open(DOWNLOAD_LIST, "r").readlines()) + # append download list + list_website(URL) + + # files that have already been downloaded + cur_files = set(os.listdir(DOWNLOAD_FOLDER)) + db = next(get_db()) + stations = set(str(s.device) for s in db.query(Station).all()) + + urls_to_download = [url.strip() for url in open(DOWNLOAD_LIST, "r").readlines()] + urls_to_download.sort(reverse=True) + + for url in tqdm(urls_to_download, desc="Downloading files", unit="files", file=open(PROGRESS_FILE, "w")): + file_name = url.split("/")[-1] + station_id = re.findall(PATTERN_STATION_ID, url)[0] + + if station_id not in stations: + log(f"Skipp {station_id}") + continue + if file_name in cur_files: + log(f"Already downloaded: {file_name}") + break + download(url) + + +if __name__ == '__main__': + log_file = open(LOG_FILE, 'w') + try: + main() + except Exception as e: + import traceback + s = traceback.format_exc() + log(s) + log_file.close() \ No newline at end of file diff --git a/code/enums.py b/code/enums.py index a83270b..89f0f29 100644 --- a/code/enums.py +++ b/code/enums.py @@ -76,6 +76,21 @@ class Dimension(): NO2: "no2_ppb", } + _sensor_community_names_import = { + PM0_1: "P01", + PM1_0: "P10", + PM2_5: "P2", + PM4_0: "P4", + PM10_0: "P1", + HUMIDITY: "humidity", + TEMPERATURE: "temperature", + PRESSURE: "pressure", + CO2: "co2_ppm", + O3: "ozone_ppb", + TVOC: "tvoc", + NO2: "no2_ppb", + } + @classmethod def get_unit(cls, dimension_id: int) -> str: """ @@ -97,6 +112,10 @@ def get_name(cls, dimension_id: int) -> str: @classmethod def get_dimension_from_sensor_community_name(cls, sensor_community_name: str): return {v:k for k, v in cls._sensor_community_names.items()}.get(sensor_community_name, None) + + @classmethod + def get_dimension_from_sensor_community_name_import(cls, sensor_community_name: str): + return {v:k for k, v in cls._sensor_community_names_import.items()}.get(sensor_community_name, None) class SensorModel(): diff --git a/code/import_from_csv.py b/code/import_from_csv.py new file mode 100644 index 0000000..86f9ae7 --- /dev/null +++ b/code/import_from_csv.py @@ -0,0 +1,148 @@ +import io +import pandas as pd +import os +from tqdm import tqdm +from sqlalchemy.orm import sessionmaker, scoped_session +from sqlalchemy import create_engine +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime +from models import * +from enums import SensorModel, Dimension + + +DOWNLOAD_FOLDER = "sensor_community_archive/csv" +LOG_FILE = "sensor_community_archive/log.txt" +PROGRESS_FILE = "sensor_community_archive/progress.txt" + +log_file = None + +# INIT DB +# Umgebungsvariablen auslesen +DB_USER = os.getenv("POSTGRES_USER", "") +DB_PASS = os.getenv("POSTGRES_PASSWORD", "") +DB_HOST = os.getenv("DB_HOST", "") +DB_NAME = os.getenv("POSTGRES_DB", "") + +# Erstellen der korrekten DATABASE_URL mit f-String +DATABASE_URL = f"postgresql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}" +engine = create_engine(DATABASE_URL) +db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine)) + + +def log(*l): + """ + simple logging + """ + print(' '.join(str(x) for x in l), file=log_file) + + +def import_sensor_community_archive_from_csv(csv_file_path: str): + """ + sensor_id;sensor_type;location;lat;lon;timestamp;pressure;altitude;pressure_sealevel;temperature + """ + db = db_session() + df = pd.read_csv(csv_file_path, encoding='utf8', sep=";") + + for row in df.iterrows(): + # check if sensor_id in database + idx, data = row + device = str(data['sensor_id']) + time_measured = datetime.fromisoformat(data['timestamp']) + sensor_model = {v: k for k, v in SensorModel._names.items()}.get(data['sensor_type'], None) + db_station = db.query(Station).filter(Station.device == device).first() + + if not db_station or not sensor_model: + continue + + m = ( + db.query(Measurement) + .filter( + Measurement.station_id == db_station.id, + Measurement.time_measured == time_measured, + Measurement.sensor_model == sensor_model + ) + .first() + ) + + # if measurement is already present skip + if m: + continue + + db_measurement = Measurement( + sensor_model=sensor_model, + station_id=db_station.id, + time_measured=time_measured, + time_received=None, + location_id=db_station.location_id + ) + + db.add(db_measurement) + db.commit() + db.refresh(db_measurement) + + #log(f"Created measurement: {vars(db_measurement)}") + + for dim_name, val in list(data.items())[6:]: + dim = Dimension.get_dimension_from_sensor_community_name_import(dim_name) + try: + val = float(val) + except ValueError: + #log(f"Value is not a float: {val}") + continue + if not dim: + continue + if val == float('nan'): + continue + + db_value = Values( + dimension=dim, + value=float(val), + measurement_id=db_measurement.id + ) + db.add(db_value) + #log(f"Added value: {vars(db_value)}") + + db.commit() + db.close() + +# singel thread +''' +def main(): + # List all files in the download folder and process them + for filename in tqdm(os.listdir(DOWNLOAD_FOLDER), desc="Import CSV files", unit="Files", file=open(PROGRESS_FILE, "w")): + file_path = os.path.join(DOWNLOAD_FOLDER, filename) + + # Ensure it's a file (not a directory) + if os.path.isfile(file_path): + # Read the file content as a string + import_sensor_community_archive_from_csv(file_path) +''' + +# multi thread +def main(): + # List all files in the download folder + files = [ + os.path.join(DOWNLOAD_FOLDER, filename) + for filename in os.listdir(DOWNLOAD_FOLDER) + if os.path.isfile(os.path.join(DOWNLOAD_FOLDER, filename)) + ] + + # Progress tracking with tqdm + with open(PROGRESS_FILE, "w") as progress_file: + with tqdm(total=len(files), desc="Import CSV files", unit="Files", file=progress_file) as pbar: + # ThreadPoolExecutor for parallel processing + with ThreadPoolExecutor() as executor: + # Define a function to update the progress bar after each task + def process_file(file_path): + import_sensor_community_archive_from_csv(file_path) + pbar.update(1) + + # Submit all tasks to the executor + for file_path in files: + executor.submit(process_file, file_path) + + +if __name__ == "__main__": + log_file = open(LOG_FILE, 'w') + main() + log_file.close() \ No newline at end of file diff --git a/code/main.py b/code/main.py index 794da23..f9743cf 100644 --- a/code/main.py +++ b/code/main.py @@ -60,7 +60,8 @@ scheduler = BackgroundScheduler() # Planen Sie die Aufgabe alle 5 Minuten -#scheduler.add_job(import_sensor_community_data, 'interval', minutes=5) +import_sensor_community_data() +scheduler.add_job(import_sensor_community_data, 'interval', minutes=5) # Scheduler starten scheduler.start() diff --git a/code/services/data_service.py b/code/services/data_service.py index ae2f212..069eec4 100644 --- a/code/services/data_service.py +++ b/code/services/data_service.py @@ -4,7 +4,7 @@ from models import Station, Measurement, Values, Location from datetime import datetime from utils import get_or_create_location -from enums import Dimension +from enums import Dimension, SensorModel def process_and_import_data(db: Session, data, source): for entry_index, entry in enumerate(data): @@ -39,13 +39,17 @@ def process_and_import_data(db: Session, data, source): logging.debug(f"Sensor Type: {sensor_type}, Wert: {value}, Typ von Wert: {type(value)}") # Dimension Mapping (wie vorher gezeigt) - dimension = Dimension.get_dimension_from_sensor_community_name(sensor_type) + dimension = Dimension.get_dimension_from_sensor_community_name_import(sensor_type) logging.debug(f"Dimension für Sensor Type '{sensor_type}': {dimension}") if dimension: + sensor_model = {v:k for k,v in SensorModel._names.items()}.get(entry["sensor"]["sensor_type"]["name"], None) + if not sensor_model: + continue sensors.setdefault(str(entry["sensor"]["id"]), { - "type": entry["sensor"]["sensor_type"]["id"], + # TODO: take name and translate to ID + "type": sensor_model, "data": {} }) diff --git a/docker-compose.yml b/docker-compose.yml index db4aa09..2db3ea8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,7 @@ services: env_file: - .env networks: - - internal + - internal db: image: postgres:16 ports: @@ -22,15 +22,7 @@ services: - .env networks: - internal - # db_test: - # image: postgres:16 - # environment: - # POSTGRES_USER: test_user - # POSTGRES_PASSWORD: test_password - # POSTGRES_DB: test_database - # networks: - # - internal - + networks: internal: diff --git a/example-docker-compose.prod.yml b/example-docker-compose.prod.yml index 3288ee9..84b6122 100644 --- a/example-docker-compose.prod.yml +++ b/example-docker-compose.prod.yml @@ -1,36 +1,65 @@ services: app: - image: luftdaten/api:0.2.0 + image: luftdaten/api:staging restart: unless-stopped - container_name: luftdaten-api-app - command: /bin/sh -c "alembic upgrade head && uvicorn main:app --host 0.0.0.0 --port 80 --reload" + command: /bin/sh -c "alembic upgrade head && uvicorn main:app --host 0.0.0.0 --port 80 --reload & (python download_csv.py && python import_from_csv.py)" expose: - 80 labels: - "traefik.enable=true" - - "traefik.http.routers.luftdaten-api.entrypoints=https" - - "traefik.http.routers.luftdaten-api.rule=(Host(`api.luftdaten.at`))" - - "traefik.http.routers.luftdaten-api.tls=true" - - "traefik.http.routers.luftdaten-api.tls.certresolver=http" - - "traefik.http.routers.luftdaten-api.service=luftdaten-api" - - "traefik.http.services.luftdaten-api.loadbalancer.server.port=80" + - "traefik.http.routers.luftdaten-api-staging.entrypoints=https" + - "traefik.http.routers.luftdaten-api-staging.rule=(Host(`staging.api.luftdaten.at`))" + - "traefik.http.routers.luftdaten-api-staging.tls=true" + - "traefik.http.routers.luftdaten-api-staging.tls.certresolver=http" + - "traefik.http.routers.luftdaten-api-staging.service=luftdaten-api-staging" + - "traefik.http.services.luftdaten-api-staging.loadbalancer.server.port=80" - "traefik.docker.network=proxy" - - "traefik.http.routers.luftdaten-api.middlewares=default@file" + - "traefik.http.routers.luftdaten-api-staging.middlewares=default@file" +# - "com.centurylinklabs.watchtower.enable=true" # Hiermit wird dieser Service explizit automatisiert aktualisiert. networks: + - default - proxy depends_on: - db env_file: - .env + volumes: + - ./sensor_community_archive:/usr/src/app/sensor_community_archive db: image: postgres:16 - container_name: luftdaten-api-db + networks: + - default volumes: - postgres_data:/var/lib/postgresql/data env_file: - .env + admin: + image: dpage/pgadmin4:latest + env_file: + - .env + depends_on: + - db + volumes: + - ./pgadmin-data/:/var/lib/pgadmin/ + labels: + - "traefik.enable=true" + - "traefik.docker.network=proxy" + - "traefik.http.routers.luftdaten-api-staging-admin.entrypoints=https" + - "traefik.http.routers.luftdaten-api-staging-admin.rule=Host(`admin.staging.api.luftdaten.at`)" + - "traefik.http.routers.luftdaten-api-staging-admin.tls=true" + - "traefik.http.routers.luftdaten-api-staging-admin.tls.certresolver=http" + - "traefik.http.routers.luftdaten-api-staging-admin.service=luftdaten-api-staging-admin" + - "traefik.http.services.luftdaten-api-staging-admin.loadbalancer.server.port=80" + - "traefik.http.routers.luftdaten-api-staging-admin.middlewares=default@file" + - "com.centurylinklabs.watchtower.enable=true" # Hiermit wird dieser Service explizit automatisiert aktualisiert. + networks: + - proxy + - default + volumes: postgres_data: + networks: + default: proxy: - external: true \ No newline at end of file + external: true diff --git a/requirements.txt b/requirements.txt index 785ce75..93119e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -53,3 +53,6 @@ urllib3==2.2.3 uvicorn==0.30.6 vine==5.1.0 wcwidth==0.2.13 +bs4 +pandas +tqdm \ No newline at end of file