From 0280b447a478e204b24131778ead9093221ff028 Mon Sep 17 00:00:00 2001 From: akrherz Date: Fri, 8 Dec 2023 12:37:16 -0600 Subject: [PATCH] fix: new station insert for bufr surface closes #190 --- parsers/pywwa/workflows/bufr_surface.py | 66 ++++++++++++++++--------- tests/workflows/test_bufr_surface.py | 6 ++- 2 files changed, 47 insertions(+), 25 deletions(-) diff --git a/parsers/pywwa/workflows/bufr_surface.py b/parsers/pywwa/workflows/bufr_surface.py index 2e5b4464..bc8ae788 100644 --- a/parsers/pywwa/workflows/bufr_surface.py +++ b/parsers/pywwa/workflows/bufr_surface.py @@ -207,13 +207,17 @@ def load_xref(txn): LOG.info("Loaded %s WIGOS2IEMID entries", len(WIGOS)) -def add_station_cb(res, sid): +def add_station_cb(iemid, sid): """Callback.""" - WIGOS[sid] = {"iemid": res.fetchone()["iemid"], "tzname": None} + WIGOS[sid] = {"iemid": iemid, "tzname": None} -def add_station(sid, data): - """Add a mesosite station entry.""" +def add_station(sid, data, mcursor=None): + """Add a mesosite station entry. + + Args: + mcursor (cursor): for testing, a mesosite cursor to use. + """ if "lon" not in data or "lat" not in data: LOG.info("Skipping %s as no location data", sid) WIGOS[sid] = {"iemid": -2} @@ -225,30 +229,43 @@ def add_station(sid, data): return sname = sname.replace(",", " ") elev = data.get("elevation") - df = MESOSITEDB.runOperation( - """ + sql = """ INSERT into stations(id, wigos, name, network, online, geom, elevation, metasite, plot_name, country) VALUES (%s, %s, %s, %s, 't', ST_Point(%s, %s, 4326), %s, 'f', %s, 'UN') returning iemid - """, - ( - sid, - sid, - sname, - NETWORK, - data["lon"], - data["lat"], - elev, - sname, - ), + """ + args = ( + sid, + sid, + sname, + NETWORK, + data["lon"], + data["lat"], + elev, + sname, ) - df.addCallback(add_station_cb, sid) - df.addErrback(common.email_error, f"{sid} {data}") + + def _insert(txn): + """Do the insert.""" + txn.execute(sql, args) + return txn.fetchone()["iemid"] + + if mcursor is None: + # no coverage for this :( + df = MESOSITEDB.runInteraction(_insert) + df.addCallback(add_station_cb, sid) + df.addErrback(common.email_error, f"{sid} {data}") + else: + add_station_cb(_insert(mcursor), sid) -def process_datalist(txn, prod, datalist): - """Do what we can do.""" +def process_datalist(txn, prod, datalist, mcursor=None): + """Do what we can do. + + Args: + mcursor (cursor): for testing, a mesosite cursor to use. + """ data = datalist2iemob_data(datalist, prod.source) if not data: return @@ -271,7 +288,7 @@ def process_datalist(txn, prod, datalist): # prevent race condition WIGOS[sid] = {"iemid": -1} meta = WIGOS[sid] - add_station(sid, data) + add_station(sid, data, mcursor) if meta["iemid"] < -1: # add station failed, so do nothing return @@ -466,12 +483,13 @@ def bmsg2datalists(bmsg) -> list: return res -def workflow(prodbytes, cursor=None): +def workflow(prodbytes, cursor=None, mcursor=None): """This processes the LDM product provided via ldmbridge. Args: prodbytes (bytes): The LDM product cursor (cursor, optional): The database cursor to use. + mcursor (cursor, optional): The mesosite database cursor to use. """ pos = prodbytes.find(b"BUFR") if pos == -1: @@ -493,7 +511,7 @@ def workflow(prodbytes, cursor=None): continue datalists.append(datalist) if cursor: - process_datalist(cursor, prod, datalist) + process_datalist(cursor, prod, datalist, mcursor) continue # Queue this up for processing defer = IEMDB.runInteraction(process_datalist, prod, datalist) diff --git a/tests/workflows/test_bufr_surface.py b/tests/workflows/test_bufr_surface.py index fe17b61a..66406031 100644 --- a/tests/workflows/test_bufr_surface.py +++ b/tests/workflows/test_bufr_surface.py @@ -2,14 +2,18 @@ import os import pytest +from pyiem.database import get_dbconnc from pywwa.testing import get_example_filepath from pywwa.workflows import bufr_surface def sync_workflow(cursor, buffn) -> int: """Run twice through workflow, eh""" + mconn, mcursor = get_dbconnc("mesosite") + bufr_surface.load_xref(mcursor) with open(get_example_filepath(f"BUFR/{buffn}"), "rb") as fh: - prod, datalists = bufr_surface.workflow(fh.read(), cursor) + prod, datalists = bufr_surface.workflow(fh.read(), cursor, mcursor) + mconn.close() obs = [] for datalist in datalists: obs.append(bufr_surface.datalist2iemob_data(datalist, prod.source))