diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8e8d2931..1fb5cdaf 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -53,13 +53,17 @@ jobs: environment-name: prod cache-environment: true + - name: Install Python requirements from pip + run: | + python -m pip install -r pip_requirements.txt + - name: Install and Test run: | set -x python -m pip install . -vv --no-deps cp pywwa_settings.json-example pywwa_settings.json # This is a bit brittle, but loads some data - python util/ugcs_update.py fz08mr23 2023 3 08 + python util/ugcs_update.py --filename=fz08mr23 --date=2023-03-08 python util/merge_hvtec_nwsli.py hvtec.list.20241009.csv python util/copy_iem_network.py MN_ASOS python util/copy_iem_network.py NY_COOP diff --git a/environment.yml b/environment.yml index a4453347..d9db8672 100644 --- a/environment.yml +++ b/environment.yml @@ -35,8 +35,8 @@ dependencies: - psycopg # Transient via pyIEM - pyarrow - # So much of other things here are redundant - - pyiem + # I'm lame + # - pyiem # BUFR processing - pybufrkit # Models diff --git a/pip_requirements.txt b/pip_requirements.txt new file mode 100644 index 00000000..5e420e15 --- /dev/null +++ b/pip_requirements.txt @@ -0,0 +1 @@ +git+https://github.com/akrherz/pyIEM.git diff --git a/util/ugcs_update.py b/util/ugcs_update.py index b91c8b48..198bca9a 100644 --- a/util/ugcs_update.py +++ b/util/ugcs_update.py @@ -31,14 +31,17 @@ import os import sys import zipfile +from datetime import datetime, timezone +import click import geopandas as gpd import httpx -from pyiem.util import get_sqlalchemy_conn, logger, utc +import pandas as pd +from psycopg.rows import dict_row +from pyiem.database import get_sqlalchemy_conn, sql_helper +from pyiem.util import logger from shapely.geometry import MultiPolygon -from pywwa.database import get_dbconnc - # Some strangely encoded WFOs need to be rectified WFO_XREF = { "PQW": "GUM", @@ -101,102 +104,110 @@ def new_poly(geo): def db_fixes(cursor, valid): """Fix some issues in the database""" for year in [2000, 2005, 2010, 2015, 2020]: - cursor.execute( - "with pops as (select gid, coalesce(sum(population), 0) as pop " - f"from ugcs u LEFT JOIN gpw{year} g on " - "st_contains(u.geom, g.geom) where " - f"u.gpw_population_{year} is null GROUP by gid) " - f"update ugcs u SET gpw_population_{year} = pop FROM pops p " - "WHERE u.gid = p.gid" + table = f"gpw{year}" + col = f"gpw_population_{year}" + res = cursor.execute( + sql_helper( + """ + with pops as ( + select gid, coalesce(sum(population), 0) as pop + from ugcs u LEFT JOIN {table} g on st_contains(u.geom, g.geom) where + u.{col} is null GROUP by gid) + update ugcs u SET {col} = pop FROM pops p WHERE u.gid = p.gid""", + table=table, + col=col, + ) ) - LOG.info("Set %s rows for gpw_population_%s", cursor.rowcount, year) + LOG.info("Set %s rows for gpw_population_%s", res.rowcount, year) - cursor.execute( - "update ugcs SET geom = st_makevalid(geom) where end_ts is null " - "and not st_isvalid(geom) and begin_ts = %s", - (valid,), + res = cursor.execute( + sql_helper(""" + update ugcs SET geom = st_makevalid(geom) where end_ts is null + and not st_isvalid(geom) and begin_ts = :begin"""), + {"begin": valid}, ) - LOG.info("Fixed %s entries that were ST_Invalid()", cursor.rowcount) + LOG.info("Fixed %s entries that were ST_Invalid()", res.rowcount) - cursor.execute( - """ + res = cursor.execute( + sql_helper(""" UPDATE ugcs SET simple_geom = ST_Multi( ST_Buffer(ST_SnapToGrid(geom, 0.01), 0) ), centroid = ST_Centroid(geom), area2163 = ST_area( ST_transform(geom, 2163) ) / 1000000.0 - WHERE begin_ts = %s or area2163 is null - """, - (valid,), - ) - LOG.info( - "Updated simple_geom,centroid,area2163 for %s rows", cursor.rowcount + WHERE begin_ts = :valid or area2163 is null + """), + { + "valid": valid, + }, ) + LOG.info("Updated simple_geom,centroid,area2163 for %s rows", res.rowcount) # Check the last step that we don't have empty geoms, which happened once def _check(): """Do the check.""" - cursor.execute( - """ + return cursor.execute( + sql_helper(""" SELECT end_ts from ugcs - where begin_ts = %s and ( + where begin_ts = :valid and ( simple_geom is null or ST_IsEmpty(simple_geom) or ST_Area(simple_geom) / ST_Area(geom) < 0.9 ) - """, - (valid,), + """), + { + "valid": valid, + }, ) - _check() - if cursor.rowcount > 0: + res = _check() + if res.rowcount > 0: LOG.info( "%s rows with empty, too small simple_geom, decreasing tolerance", - cursor.rowcount, + res.rowcount, ) cursor.execute( - """ + sql_helper(""" UPDATE ugcs SET simple_geom = ST_Multi( ST_Buffer(ST_SnapToGrid(geom, 0.0001), 0) ) - WHERE begin_ts = %s and ( + WHERE begin_ts = :valid and ( simple_geom is null or ST_IsEmpty(simple_geom) or ST_Area(simple_geom) / ST_Area(geom) < 0.9 ) - """, - (valid,), + """), + { + "valid": valid, + }, ) - _check() - if cursor.rowcount > 0: + res = _check() + if res.rowcount > 0: LOG.info( "Found %s rows with empty simple_geom, FIXME SOMEHOW!", - cursor.rowcount, + res.rowcount, ) -def truncate(cursor, valid, ugc, source): +def truncate(cursor, valid, ugc, source, ctid): """Stop the bleeding.""" - cursor.execute( - "UPDATE ugcs SET end_ts = %s WHERE ugc = %s and end_ts is null " - "and source = %s", - (valid, ugc, source), + res = cursor.execute( + sql_helper(""" + UPDATE ugcs SET end_ts = :valid WHERE ugc = :ugc and end_ts is null + and source = :source and ctid = :ctid"""), + { + "valid": valid, + "ugc": ugc, + "source": source, + "ctid": ctid, + }, ) - return cursor.rowcount - + return res.rowcount -def workflow(argv, cursor): - """Go Main Go""" - # NWS correspondence indicates the date on the website is assumed to be - # an implementation time at 18 z of that date. - valid = utc(int(argv[2]), int(argv[3]), int(argv[4]), 18) - zipfn = f"{argv[1]}.zip" - shpfn = do_download(zipfn) - # track domain - source = zipfn[:2].replace("_", "") - LOG.info("Processing, using '%s' as the database source", source) +def read_shapefile(shpfn: str) -> pd.DataFrame: + """Standardized stuff.""" df = gpd.read_file(shpfn) # Ensure CRS is set df["geometry"] = df["geometry"].set_crs("EPSG:4326", allow_override=True) @@ -205,32 +216,73 @@ def workflow(argv, cursor): sys.exit() # make all columns upper df.columns = [x.upper() if x != "geometry" else x for x in df.columns] + return df + + +def chunk_df(df: pd.DataFrame): + for ugc, gdfin in df.groupby("ugc"): + gdf = gdfin.reset_index() + if len(gdf.index) == 1: + yield gdf.iloc[0].to_dict() + continue + LOG.info("Found %s rows for %s", len(gdf.index), ugc) + # If name and cwa are the same, we can just merge the polygons + if gdf["NAME"].nunique() == 1 and gdf["wfo"].nunique() == 1: + LOG.info("--> Merging geometry of %s", ugc) + yield { + "ugc": ugc, + "geometry": new_poly(gdf.union_all()), # type: ignore + "area2163": gdf["area2163"].sum(), + "NAME": gdf.iloc[0]["NAME"], + "STATE": gdf.iloc[0]["STATE"], + "wfo": gdf.iloc[0]["wfo"], + } + continue + LOG.info("--> Keeping all rows %s", ugc) + for row in gdf.to_dict("records"): + yield row + + +def workflow(pgconn, dt, filename): + """Go Main Go""" + zipfn = f"{filename}.zip" + shpfn = do_download(zipfn) + # track domain + source = zipfn[:2].replace("_", "") + LOG.info("Processing, using '%s' as the database source", source) + df = read_shapefile(shpfn) + # Compute the area and then sort to order duplicated UGCs :/ + # Database stores as sq km + df["area2163"] = df["geometry"].to_crs(2163).area / 1e6 + df = df.sort_values(by="area2163", ascending=False) # Compute the ugc column - if zipfn[:2] in ("mz", "oz", "hz"): + wfocol = "CWA" + if source in ["mz", "oz", "hz"]: df["STATE"] = "" df["ugc"] = df["ID"] wfocol = "WFO" - elif zipfn.startswith("c_"): + elif source == "c": geo_type = "C" df["ugc"] = df["STATE"] + geo_type + df["FIPS"].str.slice(-3) df["NAME"] = df["COUNTYNAME"] - wfocol = "CWA" else: geo_type = "Z" df["ugc"] = df["STATE"] + geo_type + df["ZONE"] - wfocol = "CWA" + df = df.rename(columns={wfocol: "wfo"}) # Check that UGCs are not all null if df["ugc"].isna().all(): LOG.info("Abort as all ugcs are null") sys.exit() - with get_sqlalchemy_conn("postgis") as pgconn: - postgis = gpd.read_postgis( - "SELECT * from ugcs where end_ts is null and source = %s", - pgconn, - params=(source,), - geom_col="geom", - index_col="ugc", - ) + postgis = gpd.read_postgis( + sql_helper( + "SELECT *, ctid from ugcs " + "where end_ts is null and source = :source" + ), + pgconn, + params={"source": source}, + geom_col="geom", + index_col=None, # We have dups at the UGC level + ) postgis["covered"] = False LOG.info( "Loaded %s '%s' type rows from the database", @@ -238,108 +290,102 @@ def workflow(argv, cursor): source, ) # Rectify WFO - df[wfocol] = df[wfocol].apply(lambda x: WFO_XREF.get(x, x)) + df["wfo"] = df["wfo"].apply(lambda x: WFO_XREF.get(x, x)) - # Compute the area and then sort to order duplicated UGCs :/ - # Database stores as sq km - df["area2163"] = df["geometry"].to_crs(2163).area / 1e6 - df.sort_values(by="area2163", ascending=False, inplace=True) - gdf = df.groupby("ugc").nth(0).set_index("ugc") - LOG.info( - "Loaded %s/%s unique entries from %s", - len(gdf.index), - len(df.index), - shpfn, - ) - countnew = 0 - countdups = 0 - for ugc, row in gdf.iterrows(): + counts = {"new": 0, "changed": 0, "has": 0} + # We need to dedup, but not in all cases, so alas + for row in chunk_df(df): + ugc = row["ugc"] name = row["NAME"].strip().replace("&", " and ") - if ugc in postgis.index: - postgis.at[ugc, "covered"] = True - # Some very small number, good enough - current = postgis.loc[ugc] - if isinstance(current, gpd.GeoDataFrame): - LOG.info("abort, more than one %s found in postgis", ugc) - sys.exit() - dirty = False - # arb size decision - if abs(row["area2163"] - current["area2163"]) > 0.2: - dirty = True - LOG.debug( - "%s updating sz diff %.2d -> %.2d", - ugc, - current["area2163"], - row["area2163"], - ) - elif name != current["name"]: - dirty = True - LOG.debug( - "%s updating due to name change %s -> %s", - ugc, - current["name"], - name, - ) - elif row[wfocol] != current["wfo"]: - dirty = True - LOG.debug( - "%s updating due to wfo change %s -> %s", - ugc, - current["wfo"], - row[wfocol], - ) - if not dirty: - countdups += 1 + newugc = ugc not in postgis["ugc"].values + current = postgis[ + (postgis["ugc"] == ugc) + & (postgis["wfo"] == row["wfo"]) + & (postgis["name"] == name) + ] + # This should never happen, but just in case. + if len(current.index) > 1: + LOG.warning("FATAL ERROR, found multiple rows for %s", ugc) + print(current) + sys.exit() + if len(current.index) == 1: + # Do a crude area check + if abs(row["area2163"] - current.iloc[0]["area2163"]) < 0.2: + postgis.at[current.index[0], "covered"] = True + counts["has"] += 1 continue + LOG.info( + "%s updating sz diff %.2d -> %.2d", + ugc, + current.iloc[0]["area2163"], + row["area2163"], + ) + counts["changed"] += 1 + else: + counts["new"] += 1 - res = truncate(cursor, valid, ugc, source) LOG.info( - "%s creating new entry for %s", - "Truncating old" if res > 0 else "", + "%s creating entry for %s[wfo=%s,isnew=%s]", ugc, + name, + row["wfo"], + newugc, ) # Finally, insert the new geometry - cursor.execute( - "INSERT into ugcs (ugc, name, state, begin_ts, wfo, geom, " - "source) VALUES (%s, %s, %s, %s, %s, " - "ST_Multi(ST_SetSRID(ST_GeomFromEWKT(%s),4326)), %s)", - ( - ugc, - name, - row["STATE"], - "1980-01-01" if res == 0 else valid, - row[wfocol], - new_poly(row["geometry"]).wkt, - source, + pgconn.execute( + sql_helper( + "INSERT into ugcs (ugc, name, state, begin_ts, wfo, geom, " + "source) VALUES (:ugc, :name, :state, :begints, :wfo, " + "ST_Multi(ST_SetSRID(ST_GeomFromEWKT(:geom),4326)), :source)" ), + { + "ugc": ugc, + "name": name, + "state": row["STATE"], + "begints": "1980-01-01" if newugc else dt, + "wfo": row["wfo"], + "geom": new_poly(row["geometry"]).wkt, + "source": source, + }, ) - countnew += 1 + for _idx, row in postgis[~postgis["covered"]].iterrows(): + LOG.info("%s not found in update, truncating.", row["ugc"]) + truncate(pgconn, dt, row["ugc"], source, row["ctid"]) - for ugc, _row in postgis[~postgis["covered"]].iterrows(): - LOG.info("%s not found in update, truncating.", ugc) - truncate(cursor, valid, ugc, source) - - LOG.info("NEW: %s Dups: %s", countnew, countdups) + LOG.info( + "NEW: %s Updated: %s Has: %s", + counts["new"], + counts["changed"], + counts["has"], + ) - db_fixes(cursor, valid) + db_fixes(pgconn, dt) -def main(argv): +@click.command() +@click.option( + "--date", + "dt", + required=True, + type=click.DateTime(), + help="Date of the data", +) +@click.option("--filename", required=True, help="Zip file name (no extension)") +@click.option("--dryrun", is_flag=True, help="Dry run") +def main(dt: datetime, filename: str, dryrun: bool) -> None: """Go Main Go""" - if len(argv) != 5: - LOG.info("ERROR: You need to specify the file date to download + date") - LOG.info("Example: python ugcs_update.py z_01dec10 2010 12 01") - sys.exit(0) - - pgconn, cursor = get_dbconnc("postgis") - workflow(argv, cursor) - cursor.close() - pgconn.commit() - pgconn.close() + # Assumption is 18 UTC implementation timestamp + dt = dt.replace(hour=18, tzinfo=timezone.utc) + with get_sqlalchemy_conn("postgis") as pgconn: + pgconn.row_factory = dict_row + workflow(pgconn, dt, filename) + if not dryrun: + pgconn.commit() + else: + LOG.warning("---- DRY RUN, no commit ----") LOG.info("Done!") if __name__ == "__main__": - # Get the name of the file we wish to download - main(sys.argv) + main()