Skip to content

Commit

Permalink
Merge pull request #30 from luftdaten-at/18-test-background-task-to-c…
Browse files Browse the repository at this point in the history
…alculate-hourly-averages

18 test background task to calculate hourly averages
  • Loading branch information
n11ik authored Nov 8, 2024
2 parents 5d60671 + 0b3f899 commit 9b9b4d7
Show file tree
Hide file tree
Showing 9 changed files with 198 additions and 25 deletions.
10 changes: 10 additions & 0 deletions code/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
# ... etc.


# Funktion, um bestimmte Tabellen oder Views von den Migrationen auszuschließen
def include_object(object, name, type_, reflected, compare_to):
# Beispiel: Die View 'hourly_dimension_averages' von den Migrationen ausschließen
if type_ == "table" and name == "hourly_avg":
return False
return True


def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
Expand All @@ -47,6 +55,7 @@ def run_migrations_offline() -> None:
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
include_object=include_object
)

with context.begin_transaction():
Expand All @@ -73,6 +82,7 @@ def run_migrations_online() -> None:
connection=connection, target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
include_object=include_object
)

with context.begin_transaction():
Expand Down
52 changes: 52 additions & 0 deletions code/alembic/versions/06c69449c7d0_test_exclude_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""test exclude view
Revision ID: 06c69449c7d0
Revises: 8e1c45de1275
Create Date: 2024-11-08 13:00:41.414103
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = '06c69449c7d0'
down_revision: Union[str, None] = '8e1c45de1275'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('stationStatus',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('station_id', sa.Integer(), nullable=True),
sa.Column('timestamp', sa.DateTime(), nullable=True),
sa.Column('level', sa.Integer(), nullable=True),
sa.Column('message', sa.String(), nullable=True),
sa.ForeignKeyConstraint(['station_id'], ['stations.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_stationStatus_id'), 'stationStatus', ['id'], unique=False)
op.drop_index('ix_StationStatus_id', table_name='StationStatus')
op.drop_table('StationStatus')
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('StationStatus',
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column('station_id', sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column('timestamp', postgresql.TIMESTAMP(), autoincrement=False, nullable=True),
sa.Column('level', sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column('message', sa.VARCHAR(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(['station_id'], ['stations.id'], name='StationStatus_station_id_fkey'),
sa.PrimaryKeyConstraint('id', name='StationStatus_pkey')
)
op.create_index('ix_StationStatus_id', 'StationStatus', ['id'], unique=False)
op.drop_index(op.f('ix_stationStatus_id'), table_name='stationStatus')
op.drop_table('stationStatus')
# ### end Alembic commands ###
33 changes: 33 additions & 0 deletions code/alembic/versions/8e1c45de1275_create_hourly_avg_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""create hourly_avg view
Revision ID: 8e1c45de1275
Revises: 5516d2dd3e78
Create Date: 2024-11-07 14:21:42.941381
"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '8e1c45de1275'
down_revision: Union[str, None] = '5516d2dd3e78'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.execute("""create view hourly_avg as
select station_id, hour, jsonb_object_agg(dimension, avg) as dimension_avg
from (select station_id, dimension, date_trunc('hour', time_measured) as hour, avg(value)
from measurements as m join values as v on v.measurement_id = m.id
join stations as s on s.id = m.station_id
group by station_id, dimension, hour
order by station_id, dimension, hour)
group by station_id, hour;""")


def downgrade() -> None:
op.execute("drop view if exists hourly_avg")
19 changes: 19 additions & 0 deletions code/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ class Dimension():
SGP40_ADJUSTED_GAS: "SGP40 Adjusted Gas",
}

_sensor_community_names = {
PM0_1: "P01",
PM1_0: "P1",
PM2_5: "P2",
PM4_0: "P4",
PM10_0: "P10",
HUMIDITY: "humidity",
TEMPERATURE: "temperature",
PRESSURE: "pressure",
CO2: "co2_ppm",
O3: "ozone_ppb",
TVOC: "tvoc",
NO2: "no2_ppb",
}

@classmethod
def get_unit(cls, dimension_id: int) -> str:
"""
Expand All @@ -79,6 +94,10 @@ def get_name(cls, dimension_id: int) -> str:
"""
return cls._names.get(dimension_id, "Unknown")

@classmethod
def get_dimension_from_sensor_community_name(cls, sensor_community_name: str):
return {v:k for k, v in cls._sensor_community_names.items()}.get(sensor_community_name, None)


class SensorModel():
SEN5X = 1
Expand Down
15 changes: 13 additions & 2 deletions code/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy import Column, Integer, String, Float, ForeignKey, DateTime
from sqlalchemy import Column, Integer, String, Float, ForeignKey, DateTime, JSON
from sqlalchemy.orm import relationship
from database import Base

Expand Down Expand Up @@ -69,6 +69,8 @@ class Station(Base):
location_id = Column(Integer, ForeignKey('locations.id'))
location = relationship("Location", back_populates="stations")
measurements = relationship("Measurement", back_populates="station")
hourly_avg = relationship("HourlyDimensionAverages", back_populates="station")
stationStatus = relationship("StationStatus", back_populates="station")


class Measurement(Base):
Expand Down Expand Up @@ -110,10 +112,19 @@ class HourlyAverages(Base):


class StationStatus(Base):
__tablename__ = "StationStatus"
__tablename__ = "stationStatus"

id = Column(Integer, primary_key=True, index=True)
station_id = Column(Integer, ForeignKey('stations.id'))
station = relationship("Station", back_populates="stationStatus")
timestamp = Column(DateTime)
level = Column(Integer)
message = Column(String)

class HourlyDimensionAverages(Base):
__tablename__ = 'hourly_avg' # This should match your view name in PostgreSQL

station_id = Column(Integer, ForeignKey('stations.id'), primary_key=True) # Assuming 'station_id' uniquely identifies the record
station = relationship("Station", back_populates="hourly_avg")
hour = Column(DateTime, primary_key=True) # Hour as a datetime truncated to hour precision
dimension_avg = Column(JSON) # JSON column to store {dimension_id: avg_value} dictionary
70 changes: 65 additions & 5 deletions code/routers/station.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import json
import io
from functools import wraps
from enum import Enum

from models import Station, Location, Measurement, Values, StationStatus
from models import Station, Location, Measurement, Values, StationStatus, HourlyDimensionAverages
from schemas import StationDataCreate, SensorsCreate, StationStatusCreate
from utils import get_or_create_location, download_csv, get_or_create_station
from services.hourly_average import calculate_hourly_average
Expand Down Expand Up @@ -208,13 +209,31 @@ async def create_station_data(

return {"status": "success"}

class Precision(str, Enum):
MAX = "all data points"
HOURLY = "hourly avg (one data point per hour)"

class OutputFormat(str, Enum):
JSON = "json"
CSV = "csv"

@router.get("/test", response_class=Response, tags=["station"])
async def test(
station_ids: list[str] = Query(..., description="List of station ids"),
start: datetime = Query(None, description="Start of time interval"),
end: datetime = Query(None, description="End of time interval"),
precision: Precision = Query(..., description="Precision of data points")
):
pass


@router.get("/historical", response_class=Response, tags=["station"])
async def get_historical_station_data(
station_ids: str = Query(..., description="Comma-separated list of station devices"),
start: str = Query(..., description="Supply in format: YYYY-MM-DDThh:mm. Time is optional."),
end: str = Query(..., description="Supply in format: YYYY-MM-DDThh:mm. Time is optional."),
output_format: str = "csv",
start: str = Query(None, description="Supply in format: YYYY-MM-DDThh:mm. Time is optional."),
end: str = Query(None, description="Supply in format: YYYY-MM-DDThh:mm. Time is optional."),
output_format: OutputFormat = Query(OutputFormat.CSV, description="Ouput format"),
precision: Precision = Query(Precision.MAX, description="Precision of data points"),
db: Session = Depends(get_db)
):
# Konvertiere die Liste von station_devices in eine Liste
Expand All @@ -227,6 +246,47 @@ async def get_historical_station_data(
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DDThh:mm")

if precision == Precision.HOURLY:
query = db.query(
HourlyDimensionAverages
).join(
Station
).filter(
Station.device.in_(devices)
)

if output_format == "csv":
csv_data = "device,time_measured,dimension,value\n"
for measurement in query.all():
for dim, val in measurement.dimension_avg.items():
csv_data += f"{measurement.station.device},{measurement.hour},{int(dim)},{val}\n"
return Response(content=csv_data, media_type="text/csv")
else:
if start_date:
query.filter(HourlyDimensionAverages.hour >= start_date)
if end_date:
query.filter(HourlyDimensionAverages.hour <= end_date)

json_data = [
{
"device": measurement.station.device,
"time_measured": measurement.hour.strftime("%Y-%m-%dT%H:%M"),
"values": [
{
"dimension": int(dim),
"value": val
}
for dim, val in measurement.dimension_avg.items()
]
}
for measurement in query.all()
]

return Response(content=json.dumps(json_data), media_type="application/json")

return


# Datenbankabfrage, um die Stationen nach station_device zu filtern
query = db.query(Measurement).join(Station).filter(Station.device.in_(devices))

Expand All @@ -250,7 +310,7 @@ async def get_historical_station_data(
json_data = [
{
"device": measurement.station.device,
"time_measured": measurement.time_measured,
"time_measured": measurement.time_measured.strftime("%Y-%m-%dT%H:%M"),
"values": [{"dimension": value.dimension, "value": value.value} for value in measurement.values]
}
for measurement in measurements
Expand Down
19 changes: 3 additions & 16 deletions code/services/data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from models import Station, Measurement, Values, Location
from datetime import datetime
from utils import get_or_create_location
from enums import Dimension

def process_and_import_data(db: Session, data, source):
for entry_index, entry in enumerate(data):
Expand Down Expand Up @@ -38,21 +39,7 @@ def process_and_import_data(db: Session, data, source):
logging.debug(f"Sensor Type: {sensor_type}, Wert: {value}, Typ von Wert: {type(value)}")

# Dimension Mapping (wie vorher gezeigt)
dimension = None
if sensor_type == "temperature":
dimension = 7 # Beispiel: Temperatur
elif sensor_type == "humidity":
dimension = 6 # Beispiel: Luftfeuchtigkeit
elif sensor_type == "P0":
dimension = 2 # Beispiel: PM1
elif sensor_type == "P1":
dimension = 5 # Beispiel: PM10
elif sensor_type == "P2":
dimension = 3 # Beispiel: PM2.5
elif sensor_type == "P4":
dimension = 4 # Beispiel: PM4
elif sensor_type == "pressure":
dimension = 8 # Beispiel: Druck
dimension = Dimension.get_dimension_from_sensor_community_name(sensor_type)

logging.debug(f"Dimension für Sensor Type '{sensor_type}': {dimension}")

Expand Down Expand Up @@ -192,4 +179,4 @@ def import_station_data(db: Session, station_data, sensors):
db.commit()
logging.debug("Alle Daten wurden erfolgreich in die Datenbank geschrieben.")

return {"status": "success"}
return {"status": "success"}
2 changes: 1 addition & 1 deletion code/services/hourly_average.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ def calculate_hourly_average(station_id: int, db: Session):
)
db.add(hourly_avg)

db.commit()
db.commit()
3 changes: 2 additions & 1 deletion code/tasks/periodic_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import requests
from database import get_db
from services.data_service import process_and_import_data
from enums import Source

# Logging-Konfiguration
logging.basicConfig(level=logging.DEBUG)
Expand Down Expand Up @@ -37,7 +38,7 @@ def import_sensor_community_data():
return

try:
process_and_import_data(db, data, source=3) # Verarbeite und speichere die Daten
process_and_import_data(db, data, source=Source.SC) # Verarbeite und speichere die Daten
logger.info("Data processed and imported successfully.")
except Exception as e:
logger.error(f"Error processing and importing data: {e}")
Expand Down

0 comments on commit 9b9b4d7

Please sign in to comment.