2
2
import xarray as xr
3
3
import pandas as pd
4
4
import numpy as np
5
+ import dask .array as da
5
6
import zarr
6
7
import os
7
8
@@ -74,20 +75,90 @@ def store(weather_ds, meta_df):
74
75
75
76
combined_ds = _combine_geo_weather_meta (weather_ds , meta_df )
76
77
77
- # what mode should this be
78
- # we want to add to indexes if need be or overwrite old ones
79
- combined_ds .to_zarr (
80
- store = METOROLOGICAL_DOWNLOAD_PATH ,
81
- group = f"{ group } -{ periodicity } "
82
- )
78
+
79
+ if not os .path .exists (os .path .join (METOROLOGICAL_DOWNLOAD_PATH , ".zmetadata" )): # no zstore in directory
80
+ print ("Creating Zarr" )
81
+
82
+ combined_ds .to_zarr (
83
+ store = METOROLOGICAL_DOWNLOAD_PATH ,
84
+ group = f"{ group } -{ periodicity } " ,
85
+ )
86
+ else : # store already exists
87
+ print ("adding to store" )
88
+
89
+ print ("opening store" )
90
+ stored_ds = xr .open_zarr (
91
+ store = METOROLOGICAL_DOWNLOAD_PATH ,
92
+ group = f"{ group } -{ periodicity } " ,
93
+ )
94
+
95
+ lat_lon_gid_2d_map = _make_coords_to_gid_da (ds_from_zarr = stored_ds )
96
+
97
+ for gid , values in meta_df .iterrows ():
98
+
99
+ target_lat = values ["latitude" ]
100
+ target_lon = values ["longitude" ]
101
+
102
+ lat_exists = np .any (lat_lon_gid_2d_map .latitude == target_lat )
103
+ lon_exists = np .any (lat_lon_gid_2d_map .longitude == target_lon )
104
+
105
+ if lat_exists and lon_exists :
106
+ print ("(lat, lon) exists already" )
107
+ stored_gid = lat_lon_gid_2d_map .sel (latitude = target_lat , longitude = target_lon )
108
+
109
+ # overwrite previous value at that lat-lon, keeps old gid
110
+
111
+ # will this be a view
112
+ # how can we assign the value
113
+ # cant slice?
114
+ stored_ds .sel (gid = stored_gid )[:] = combined_ds .sel (gid = gid ).values ()
115
+
116
+ else : # coordinate pair doesnt exist and it needs to be added, this will be a HEAVY operation
117
+ print ("add entry to dataset" )
118
+
119
+ # we are trying to save 1 "sheet" of weather (weather at a single gid)
120
+ # need to update the index to fit into the stored data after we concatenate
121
+ # we want to update the arbitrary gid in the input (combined_ds) to the next index in the gid array (starts at 0, current_gid + 1 = sizes["gid"] = new gid)
122
+ new_gid = stored_ds .sizes ["gid" ]
123
+
124
+ # combined_ds.sel(gid=gid) = combined_ds.sel(gid=gid).assign_coords(gid=[new_gid]) # we may have the issues with this sel returning a view
125
+ updated_entry = combined_ds .sel (gid = gid ).assign_coords (gid = [new_gid ])
126
+
127
+ stored_ds = xr .concat ([stored_ds , updated_entry ], dim = "gid" )
128
+
129
+ # trigger rechunking
130
+ # should this happen outside of the loop
131
+ stored_ds = stored_ds .chunk ()
132
+
133
+ # SAVE DATASET BACK TO STORE
134
+ stored_ds .to_zarr (METOROLOGICAL_DOWNLOAD_PATH , group = f"{ group } -{ periodicity } " , mode = 'w' ) # test with "a" probably wont work
83
135
84
136
print (f"dataset saved to zarr store at { METOROLOGICAL_DOWNLOAD_PATH } " )
85
137
138
+ ### THIS NEEDS TO BE DEPRECATED
139
+ def _add_entry_to_ds (combined_ds , stored_ds , target_lat , target_lon , gid ):
140
+
141
+ new_gid = stored_ds .sizes ["gid" ] # zero indexed so the next index will be the current size
142
+
143
+ # new_entry = combined_ds.sel(gid=gid).expand_dims(gid=new_gid)
144
+
145
+ # for var in new_entry.data_vars:
146
+ # existing_data = stored_ds[var]
147
+ # new_data = new_entry[var]
148
+
149
+ # updated_data = xr.concat([existing_data, new_data], dim='gid')
150
+ stored_ds = xr .concat ([stored_ds , combined_ds .sel (gid = gid )], dim = "gid" )
151
+
152
+ # stored_ds[var] = updated_datag
153
+
154
+ # stored_ds['latitude'] = xr.concat([stored_ds['latitude'], xr.DataArray([target_lat], dims='gid')], dim='gid')
155
+ # stored_ds['longitude'] = xr.concat([stored_ds['longitude'], xr.DataArray([target_lon], dims='gid')], dim='gid')
156
+
86
157
87
158
88
159
def check_store ():
89
160
"""Check if you have a zarr store at the default download path defined in pvdeg.config"""
90
- if os .path .exists (os .path .join (METOROLOGICAL_DOWNLOAD_PATH , ".zattrs " )):
161
+ if os .path .exists (os .path .join (METOROLOGICAL_DOWNLOAD_PATH , ".zmetadata " )):
91
162
92
163
size = sum (f .stat ().st_size for f in METOROLOGICAL_DOWNLOAD_PATH .glob ('**/*' ) if f .is_file ())
93
164
@@ -118,17 +189,15 @@ def _combine_geo_weather_meta(
118
189
):
119
190
"""Combine weather dataset and meta dataframe into a single dataset"""
120
191
121
- meta_ds = xr .Dataset .from_dataframe (meta_df )
122
- # we could do some encoding scheme here, dont need to store source? unless the zarr compression handles it for us
123
-
124
- meta_ds ['gid' ] = meta_ds ['index' ].values .astype (np .int32 )
125
- meta_ds = meta_ds .drop_vars (["index" ])
192
+ meta_ds = xr .Dataset .from_dataframe (meta_df ).rename ({'index' : 'gid' })
126
193
127
194
combined = xr .merge ([weather_ds , meta_ds ]).assign_coords (
128
195
latitude = ("gid" , meta_ds .latitude .values ),
129
196
longitude = ('gid' , meta_ds .longitude .values ),
130
197
)
131
198
199
+ combined ["Source" ] = combined ["Source" ].astype (str ) # save as strings
200
+
132
201
return combined
133
202
134
203
@@ -140,6 +209,8 @@ def _seperate_geo_weather_meta(
140
209
and seperate it into `weather_ds` and `meta_df`.
141
210
"""
142
211
212
+ ds_from_zarr ["Source" ] = ds_from_zarr ["Source" ].astype (object ) # geospatial.mapblocks needs this to be an object
213
+
143
214
# there may be a more optimal way to do this
144
215
data = np .column_stack (
145
216
[
@@ -163,7 +234,6 @@ def _make_coords_to_gid_da(
163
234
):
164
235
"""Create a 2D indexable array that maps coordinates (lat and lon) to gid stored in zarr store"""
165
236
166
- import dask .array as da
167
237
168
238
# only want to do this if the arrays are dask arrays
169
239
lats = ds_from_zarr .latitude .to_numpy ()
0 commit comments