@@ -75,13 +75,13 @@ def store(weather_ds, meta_df):
75
75
76
76
combined_ds = _combine_geo_weather_meta (weather_ds , meta_df )
77
77
78
-
79
78
if not os .path .exists (os .path .join (METOROLOGICAL_DOWNLOAD_PATH , ".zmetadata" )): # no zstore in directory
80
79
print ("Creating Zarr" )
81
80
82
81
combined_ds .to_zarr (
83
82
store = METOROLOGICAL_DOWNLOAD_PATH ,
84
83
group = f"{ group } -{ periodicity } " ,
84
+ mode = "w-" , # only use for first time creating store
85
85
)
86
86
else : # store already exists
87
87
print ("adding to store" )
@@ -90,6 +90,7 @@ def store(weather_ds, meta_df):
90
90
stored_ds = xr .open_zarr (
91
91
store = METOROLOGICAL_DOWNLOAD_PATH ,
92
92
group = f"{ group } -{ periodicity } " ,
93
+ # consolidated=True
93
94
)
94
95
95
96
lat_lon_gid_2d_map = _make_coords_to_gid_da (ds_from_zarr = stored_ds )
@@ -104,57 +105,42 @@ def store(weather_ds, meta_df):
104
105
105
106
if lat_exists and lon_exists :
106
107
print ("(lat, lon) exists already" )
107
- stored_gid = lat_lon_gid_2d_map .sel (latitude = target_lat , longitude = target_lon )
108
108
109
- # overwrite previous value at that lat-lon, keeps old gid
109
+ raise NotImplementedError
110
+
111
+ # stored_gid = lat_lon_gid_2d_map.sel(latitude=target_lat, longitude=target_lon)
112
+
113
+ # # overwrite previous value at that lat-lon, keeps old gid
114
+
115
+ # # need to set the gid of the current "sheet" to the stored gid
116
+ # updated_entry = combined_ds.loc[{"gid": gid}].assign_coords({"gid": stored_gid}) # this may need to be a list s.t. [stored_gid]
117
+ # # loc may remove the gid dimension so we might have to add it back with .expand_dims
118
+
119
+ # # overwrite the current entry at the gid = stored_gid entry of the zarr
120
+ # updated_entry.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode='w')
110
121
111
- # will this be a view
112
- # how can we assign the value
113
- # cant slice?
114
- stored_ds .sel (gid = stored_gid )[:] = combined_ds .sel (gid = gid ).values ()
115
122
116
123
else : # coordinate pair doesnt exist and it needs to be added, this will be a HEAVY operation
117
124
print ("add entry to dataset" )
118
125
119
126
# we are trying to save 1 "sheet" of weather (weather at a single gid)
120
127
# need to update the index to fit into the stored data after we concatenate
121
- # we want to update the arbitrary gid in the input (combined_ds) to the next index in the gid array (starts at 0, current_gid + 1 = sizes["gid"] = new gid)
122
- new_gid = stored_ds .sizes ["gid" ]
123
-
124
- # combined_ds.sel(gid=gid) = combined_ds.sel(gid=gid).assign_coords(gid=[new_gid]) # we may have the issues with this sel returning a view
125
- updated_entry = combined_ds .sel (gid = gid ).assign_coords (gid = [new_gid ])
126
128
127
- stored_ds = xr .concat ([stored_ds , updated_entry ], dim = "gid" )
129
+ # this concatenates along the the gid axis
130
+ # gid has no guarantee of being unqiue but duplicate gids are fine for xarray
131
+ # we slice so we can get a Dataset with dimensions of (gid, time) indexing to grab one gid will drop the gid dimension
132
+ new_gid = stored_ds .sizes ["gid" ]
128
133
129
- # trigger rechunking
130
- # should this happen outside of the loop
131
- stored_ds = stored_ds . chunk ()
134
+ weather_sheet = combined_ds . sel ( gid = slice ( gid ))
135
+ updated_entry = weather_sheet . assign_coords ({ "gid" : [ new_gid ]})
136
+ updated_entry . to_zarr ( store = METOROLOGICAL_DOWNLOAD_PATH , group = f" { group } - { periodicity } " , mode = "a" , append_dim = "gid" )
132
137
133
- # SAVE DATASET BACK TO STORE
134
- stored_ds .to_zarr (METOROLOGICAL_DOWNLOAD_PATH , group = f"{ group } -{ periodicity } " , mode = 'w' ) # test with "a" probably wont work
138
+ # new_entry_added_ds = xr.concat([stored_ds, updated_entry], dim="gid")
135
139
140
+ # new_entry_added_ds.to_zarr(store=METOROLOGICAL_DOWNLOAD_PATH, group=f"{group}-{periodicity}", mode="a", append_dim="gid")
141
+
136
142
print (f"dataset saved to zarr store at { METOROLOGICAL_DOWNLOAD_PATH } " )
137
143
138
- ### THIS NEEDS TO BE DEPRECATED
139
- def _add_entry_to_ds (combined_ds , stored_ds , target_lat , target_lon , gid ):
140
-
141
- new_gid = stored_ds .sizes ["gid" ] # zero indexed so the next index will be the current size
142
-
143
- # new_entry = combined_ds.sel(gid=gid).expand_dims(gid=new_gid)
144
-
145
- # for var in new_entry.data_vars:
146
- # existing_data = stored_ds[var]
147
- # new_data = new_entry[var]
148
-
149
- # updated_data = xr.concat([existing_data, new_data], dim='gid')
150
- stored_ds = xr .concat ([stored_ds , combined_ds .sel (gid = gid )], dim = "gid" )
151
-
152
- # stored_ds[var] = updated_datag
153
-
154
- # stored_ds['latitude'] = xr.concat([stored_ds['latitude'], xr.DataArray([target_lat], dims='gid')], dim='gid')
155
- # stored_ds['longitude'] = xr.concat([stored_ds['longitude'], xr.DataArray([target_lon], dims='gid')], dim='gid')
156
-
157
-
158
144
159
145
def check_store ():
160
146
"""Check if you have a zarr store at the default download path defined in pvdeg.config"""
@@ -189,8 +175,10 @@ def _combine_geo_weather_meta(
189
175
):
190
176
"""Combine weather dataset and meta dataframe into a single dataset"""
191
177
178
+ # if meta_df.index.name == 'index':
192
179
meta_ds = xr .Dataset .from_dataframe (meta_df ).rename ({'index' : 'gid' })
193
180
181
+
194
182
combined = xr .merge ([weather_ds , meta_ds ]).assign_coords (
195
183
latitude = ("gid" , meta_ds .latitude .values ),
196
184
longitude = ('gid' , meta_ds .longitude .values ),
0 commit comments