Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

18 test background task to calculate hourly averages #30

Merged
merged 8 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions code/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
# ... etc.


# Funktion, um bestimmte Tabellen oder Views von den Migrationen auszuschließen
def include_object(object, name, type_, reflected, compare_to):
# Beispiel: Die View 'hourly_dimension_averages' von den Migrationen ausschließen
if type_ == "table" and name == "hourly_avg":
return False
return True


def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
Expand All @@ -47,6 +55,7 @@ def run_migrations_offline() -> None:
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
include_object=include_object
)

with context.begin_transaction():
Expand All @@ -73,6 +82,7 @@ def run_migrations_online() -> None:
connection=connection, target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
include_object=include_object
)

with context.begin_transaction():
Expand Down
52 changes: 52 additions & 0 deletions code/alembic/versions/06c69449c7d0_test_exclude_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""test exclude view

Revision ID: 06c69449c7d0
Revises: 8e1c45de1275
Create Date: 2024-11-08 13:00:41.414103

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql

# revision identifiers, used by Alembic.
revision: str = '06c69449c7d0'
down_revision: Union[str, None] = '8e1c45de1275'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('stationStatus',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('station_id', sa.Integer(), nullable=True),
sa.Column('timestamp', sa.DateTime(), nullable=True),
sa.Column('level', sa.Integer(), nullable=True),
sa.Column('message', sa.String(), nullable=True),
sa.ForeignKeyConstraint(['station_id'], ['stations.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_stationStatus_id'), 'stationStatus', ['id'], unique=False)
op.drop_index('ix_StationStatus_id', table_name='StationStatus')
op.drop_table('StationStatus')
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('StationStatus',
sa.Column('id', sa.INTEGER(), autoincrement=True, nullable=False),
sa.Column('station_id', sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column('timestamp', postgresql.TIMESTAMP(), autoincrement=False, nullable=True),
sa.Column('level', sa.INTEGER(), autoincrement=False, nullable=True),
sa.Column('message', sa.VARCHAR(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(['station_id'], ['stations.id'], name='StationStatus_station_id_fkey'),
sa.PrimaryKeyConstraint('id', name='StationStatus_pkey')
)
op.create_index('ix_StationStatus_id', 'StationStatus', ['id'], unique=False)
op.drop_index(op.f('ix_stationStatus_id'), table_name='stationStatus')
op.drop_table('stationStatus')
# ### end Alembic commands ###
33 changes: 33 additions & 0 deletions code/alembic/versions/8e1c45de1275_create_hourly_avg_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""create hourly_avg view

Revision ID: 8e1c45de1275
Revises: 5516d2dd3e78
Create Date: 2024-11-07 14:21:42.941381

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '8e1c45de1275'
down_revision: Union[str, None] = '5516d2dd3e78'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.execute("""create view hourly_avg as
select station_id, hour, jsonb_object_agg(dimension, avg) as dimension_avg
from (select station_id, dimension, date_trunc('hour', time_measured) as hour, avg(value)
from measurements as m join values as v on v.measurement_id = m.id
join stations as s on s.id = m.station_id
group by station_id, dimension, hour
order by station_id, dimension, hour)
group by station_id, hour;""")


def downgrade() -> None:
op.execute("drop view if exists hourly_avg")
19 changes: 19 additions & 0 deletions code/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,21 @@ class Dimension():
SGP40_ADJUSTED_GAS: "SGP40 Adjusted Gas",
}

_sensor_community_names = {
PM0_1: "P01",
PM1_0: "P1",
PM2_5: "P2",
PM4_0: "P4",
PM10_0: "P10",
HUMIDITY: "humidity",
TEMPERATURE: "temperature",
PRESSURE: "pressure",
CO2: "co2_ppm",
O3: "ozone_ppb",
TVOC: "tvoc",
NO2: "no2_ppb",
}

@classmethod
def get_unit(cls, dimension_id: int) -> str:
"""
Expand All @@ -79,6 +94,10 @@ def get_name(cls, dimension_id: int) -> str:
"""
return cls._names.get(dimension_id, "Unknown")

@classmethod
def get_dimension_from_sensor_community_name(cls, sensor_community_name: str):
return {v:k for k, v in cls._sensor_community_names.items()}.get(sensor_community_name, None)


class SensorModel():
SEN5X = 1
Expand Down
15 changes: 13 additions & 2 deletions code/models.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy import Column, Integer, String, Float, ForeignKey, DateTime
from sqlalchemy import Column, Integer, String, Float, ForeignKey, DateTime, JSON
from sqlalchemy.orm import relationship
from database import Base

Expand Down Expand Up @@ -69,6 +69,8 @@ class Station(Base):
location_id = Column(Integer, ForeignKey('locations.id'))
location = relationship("Location", back_populates="stations")
measurements = relationship("Measurement", back_populates="station")
hourly_avg = relationship("HourlyDimensionAverages", back_populates="station")
stationStatus = relationship("StationStatus", back_populates="station")


class Measurement(Base):
Expand Down Expand Up @@ -110,10 +112,19 @@ class HourlyAverages(Base):


class StationStatus(Base):
__tablename__ = "StationStatus"
__tablename__ = "stationStatus"

id = Column(Integer, primary_key=True, index=True)
station_id = Column(Integer, ForeignKey('stations.id'))
station = relationship("Station", back_populates="stationStatus")
timestamp = Column(DateTime)
level = Column(Integer)
message = Column(String)

class HourlyDimensionAverages(Base):
__tablename__ = 'hourly_avg' # This should match your view name in PostgreSQL

station_id = Column(Integer, ForeignKey('stations.id'), primary_key=True) # Assuming 'station_id' uniquely identifies the record
station = relationship("Station", back_populates="hourly_avg")
hour = Column(DateTime, primary_key=True) # Hour as a datetime truncated to hour precision
dimension_avg = Column(JSON) # JSON column to store {dimension_id: avg_value} dictionary
70 changes: 65 additions & 5 deletions code/routers/station.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import json
import io
from functools import wraps
from enum import Enum

from models import Station, Location, Measurement, Values, StationStatus
from models import Station, Location, Measurement, Values, StationStatus, HourlyDimensionAverages
from schemas import StationDataCreate, SensorsCreate, StationStatusCreate
from utils import get_or_create_location, download_csv, get_or_create_station
from services.hourly_average import calculate_hourly_average
Expand Down Expand Up @@ -208,13 +209,31 @@ async def create_station_data(

return {"status": "success"}

class Precision(str, Enum):
MAX = "all data points"
HOURLY = "hourly avg (one data point per hour)"

class OutputFormat(str, Enum):
JSON = "json"
CSV = "csv"

@router.get("/test", response_class=Response, tags=["station"])
async def test(
station_ids: list[str] = Query(..., description="List of station ids"),
start: datetime = Query(None, description="Start of time interval"),
end: datetime = Query(None, description="End of time interval"),
precision: Precision = Query(..., description="Precision of data points")
):
pass


@router.get("/historical", response_class=Response, tags=["station"])
async def get_historical_station_data(
station_ids: str = Query(..., description="Comma-separated list of station devices"),
start: str = Query(..., description="Supply in format: YYYY-MM-DDThh:mm. Time is optional."),
end: str = Query(..., description="Supply in format: YYYY-MM-DDThh:mm. Time is optional."),
output_format: str = "csv",
start: str = Query(None, description="Supply in format: YYYY-MM-DDThh:mm. Time is optional."),
end: str = Query(None, description="Supply in format: YYYY-MM-DDThh:mm. Time is optional."),
output_format: OutputFormat = Query(OutputFormat.CSV, description="Ouput format"),
precision: Precision = Query(Precision.MAX, description="Precision of data points"),
db: Session = Depends(get_db)
):
# Konvertiere die Liste von station_devices in eine Liste
Expand All @@ -227,6 +246,47 @@ async def get_historical_station_data(
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DDThh:mm")

if precision == Precision.HOURLY:
query = db.query(
HourlyDimensionAverages
).join(
Station
).filter(
Station.device.in_(devices)
)

if output_format == "csv":
csv_data = "device,time_measured,dimension,value\n"
for measurement in query.all():
for dim, val in measurement.dimension_avg.items():
csv_data += f"{measurement.station.device},{measurement.hour},{int(dim)},{val}\n"
return Response(content=csv_data, media_type="text/csv")
else:
if start_date:
query.filter(HourlyDimensionAverages.hour >= start_date)
if end_date:
query.filter(HourlyDimensionAverages.hour <= end_date)

json_data = [
{
"device": measurement.station.device,
"time_measured": measurement.hour.strftime("%Y-%m-%dT%H:%M"),
"values": [
{
"dimension": int(dim),
"value": val
}
for dim, val in measurement.dimension_avg.items()
]
}
for measurement in query.all()
]

return Response(content=json.dumps(json_data), media_type="application/json")

return


# Datenbankabfrage, um die Stationen nach station_device zu filtern
query = db.query(Measurement).join(Station).filter(Station.device.in_(devices))

Expand All @@ -250,7 +310,7 @@ async def get_historical_station_data(
json_data = [
{
"device": measurement.station.device,
"time_measured": measurement.time_measured,
"time_measured": measurement.time_measured.strftime("%Y-%m-%dT%H:%M"),
"values": [{"dimension": value.dimension, "value": value.value} for value in measurement.values]
}
for measurement in measurements
Expand Down
19 changes: 3 additions & 16 deletions code/services/data_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from models import Station, Measurement, Values, Location
from datetime import datetime
from utils import get_or_create_location
from enums import Dimension

def process_and_import_data(db: Session, data, source):
for entry_index, entry in enumerate(data):
Expand Down Expand Up @@ -38,21 +39,7 @@ def process_and_import_data(db: Session, data, source):
logging.debug(f"Sensor Type: {sensor_type}, Wert: {value}, Typ von Wert: {type(value)}")

# Dimension Mapping (wie vorher gezeigt)
dimension = None
if sensor_type == "temperature":
dimension = 7 # Beispiel: Temperatur
elif sensor_type == "humidity":
dimension = 6 # Beispiel: Luftfeuchtigkeit
elif sensor_type == "P0":
dimension = 2 # Beispiel: PM1
elif sensor_type == "P1":
dimension = 5 # Beispiel: PM10
elif sensor_type == "P2":
dimension = 3 # Beispiel: PM2.5
elif sensor_type == "P4":
dimension = 4 # Beispiel: PM4
elif sensor_type == "pressure":
dimension = 8 # Beispiel: Druck
dimension = Dimension.get_dimension_from_sensor_community_name(sensor_type)

logging.debug(f"Dimension für Sensor Type '{sensor_type}': {dimension}")

Expand Down Expand Up @@ -192,4 +179,4 @@ def import_station_data(db: Session, station_data, sensors):
db.commit()
logging.debug("Alle Daten wurden erfolgreich in die Datenbank geschrieben.")

return {"status": "success"}
return {"status": "success"}
2 changes: 1 addition & 1 deletion code/services/hourly_average.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,4 @@ def calculate_hourly_average(station_id: int, db: Session):
)
db.add(hourly_avg)

db.commit()
db.commit()
3 changes: 2 additions & 1 deletion code/tasks/periodic_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import requests
from database import get_db
from services.data_service import process_and_import_data
from enums import Source

# Logging-Konfiguration
logging.basicConfig(level=logging.DEBUG)
Expand Down Expand Up @@ -37,7 +38,7 @@ def import_sensor_community_data():
return

try:
process_and_import_data(db, data, source=3) # Verarbeite und speichere die Daten
process_and_import_data(db, data, source=Source.SC) # Verarbeite und speichere die Daten
logger.info("Data processed and imported successfully.")
except Exception as e:
logger.error(f"Error processing and importing data: {e}")
Expand Down