Skip to content

Commit 37f1486

Browse files
committed
distributed NSRDB skeleton
1 parent 19f2788 commit 37f1486

File tree

4 files changed

+679
-3252
lines changed

4 files changed

+679
-3252
lines changed

pvdeg/store.py

+52-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import zarr
77
import os
88

9+
from pvdeg.weather import pvgis_hourly_empty_weather_ds
10+
911
from pvdeg import METOROLOGICAL_DOWNLOAD_PATH
1012

1113
def get(group):
@@ -134,11 +136,7 @@ def store(weather_ds, meta_df):
134136
weather_sheet = combined_ds.sel(gid=slice(gid))
135137
updated_entry = weather_sheet.assign_coords({"gid": [new_gid]})
136138
updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode="a", append_dim="gid")
137-
138-
# new_entry_added_ds = xr.concat([stored_ds, updated_entry], dim="gid")
139-
140-
# new_entry_added_ds.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode="a", append_dim="gid")
141-
139+
142140
print(f"dataset saved to zarr store at {METOROLOGICAL_DOWNLOAD_PATH}")
143141

144142

@@ -240,4 +238,52 @@ def _make_coords_to_gid_da(
240238

241239
points.set_index(latitude="latitude", longitude="longitude")
242240

243-
return points
241+
return points
242+
243+
def _create_sample_sheet(fill_value, latitude: float=999, longitude: float=999, altitude: float=-1, wind_height: int=-1, Source: str="SampleSheet"):
244+
"""
245+
Create a dummy sample dataset containing weather for one gid. This will be called a sheet, a single location of weather_data from the dataset with the gid coordinate still present.
246+
247+
The sizes of the dimensions of the sheet will be {"gid": 1, "time": 8760}
248+
249+
Parameters
250+
-----------
251+
fill_value: numeric
252+
value to populate weather_ds single sheet with
253+
latitude: float
254+
dummy latitude WSG84
255+
longitude: float
256+
dummy longitude WSG84
257+
altitude: float
258+
dummy altitude of measured data [m]
259+
wind_height: int
260+
dummy height of measure sample dataset's wind measurement
261+
262+
Returns
263+
--------
264+
sheet_ds : xr.Dataset
265+
Dummy weather data sheet for a single location using a dask array backend. As mentioned above this will look maintain the gid coordinate.
266+
meta_df : pd.DataFrame
267+
Dummy metadata for test location in pandas.DataFrame.
268+
"""
269+
270+
meta_dict = {
271+
'latitude': latitude,
272+
'longitude': longitude,
273+
'altitude': altitude,
274+
'wind_height': wind_height,
275+
'Source': Source
276+
}
277+
278+
meta_df = pd.DataFrame(meta_dict, index=[0])
279+
280+
sheet_ds = pvgis_hourly_empty_weather_ds(gids_size=1)
281+
282+
dummy_da = da.full(shape=(1,sheet_ds.sizes["time"]), fill_value=fill_value)
283+
284+
for var in sheet_ds.data_vars:
285+
286+
dim = sheet_ds[var].dims
287+
sheet_ds[var] = (dim, dummy_da)
288+
289+
return sheet_ds, meta_df

pvdeg/weather.py

+35-10
Original file line numberDiff line numberDiff line change
@@ -975,6 +975,8 @@ def process_pvgis_distributed(weather_df):
975975
def _weather_distributed_vec(
976976
database: str,
977977
coord: tuple[float],
978+
api_key: str, # NSRDB api key
979+
email: str # NSRDB developer email
978980
):
979981
"""
980982
Distributed weather calculation for use with dask futures/delayed
@@ -984,7 +986,13 @@ def _weather_distributed_vec(
984986
"""
985987

986988
try:
987-
weather_df, meta_dict = get(database, coord)
989+
if (database == "PVGIS"): # does not need api key
990+
weather_df, meta_dict = get(database=database, id=coord)
991+
elif (database == "PSM3"):
992+
weather_df, meta_dict = get(database=database, id=coord, api_key=api_key, email=email)
993+
else:
994+
raise NotImplementedError(f'database {database} not implemented, options: "PVGIS", "PSM3"')
995+
988996
except Exception as e:
989997
return None, None, e
990998

@@ -1040,18 +1048,26 @@ def pvgis_hourly_empty_weather_ds(gids_size):
10401048

10411049
# add some check to see if a dask client exists
10421050
# can force user to pass dask client to ensure it exists
1043-
10441051
# if called without dask client we will return a xr.Dataset
10451052
# with dask backend that does not appear as if it failed until we compute it
1046-
def weather_distributed(database, coords):
1053+
1054+
# TODO: implement rate throttling so we do not make too many requests.
1055+
# TODO: multiple API keys to get around NSRDB key rate limit. 2 key, email pairs means twice the speed ;)
1056+
def weather_distributed(
1057+
database: str,
1058+
coords: list[tuple],
1059+
api_key: str = None,
1060+
email: str = None
1061+
):
10471062
"""
10481063
Grab weather using pvgis for all of the following locations using dask for parallelization.
10491064
You must create a dask client with multiple processes before calling this function, otherwise results will not be properly calculated.
10501065
10511066
PVGIS supports up to 30 requests per second so your dask client should not have more than $x$ workers/threads
10521067
that would put you over this limit.
10531068
1054-
With eventual NSRDB implementation API keys will be an issue, each key is rate limited.
1069+
NSRDB (including `database="PSM3"`) is rate limited and your key will face restrictions after making too many requests.
1070+
See rates [here](https://developer.nrel.gov/docs/solar/nsrdb/guide/).
10551071
10561072
Parameters
10571073
----------
@@ -1060,14 +1076,23 @@ def weather_distributed(database, coords):
10601076
coords: list[tuple]
10611077
list of tuples containing (latitude, longitude) coordinates
10621078
1063-
>>> [
1079+
.. code-block:: python
1080+
1081+
coords_example = [
10641082
(49.95, 1.5),
10651083
(51.95, -9.5),
10661084
(51.95, -8.5),
10671085
(51.95, -4.5),
1068-
(51.95, -3.5),
1069-
]
1086+
(51.95, -3.5)]
10701087
1088+
api_key: str
1089+
Only required when making NSRDB requests using "PSM3".
1090+
[NSRDB developer API key](https://developer.nrel.gov/signup/)
1091+
1092+
email: str
1093+
Only required when making NSRDB requests using "PSM3".
1094+
[NSRDB developer account email associated with `api_key`](https://developer.nrel.gov/signup/)
1095+
10711096
Returns
10721097
--------
10731098
weather_ds : xr.Dataset
@@ -1080,10 +1105,10 @@ def weather_distributed(database, coords):
10801105

10811106
import dask.delayed
10821107

1083-
if (database != "PVGIS"):
1084-
raise NotImplementedError(f"Only 'PVGIS' is implemented, you entered {database}")
1108+
if (database != "PVGIS" and database != "PSM3"):
1109+
raise NotImplementedError(f"Only 'PVGIS' and 'PSM3' are implemented, you entered {database}")
10851110

1086-
futures = [dask.delayed(_weather_distributed_vec)("PVGIS", coord) for coord in coords]
1111+
futures = [dask.delayed(_weather_distributed_vec)(database, coord, api_key, email) for coord in coords]
10871112
results = dask.compute(futures)[0] # values are returned in a list with one entry
10881113

10891114
# what is the difference between these two approaches for dask distributed work, how can we schedule differently

0 commit comments

Comments
 (0)