Skip to content

Commit

Permalink
allow no data in call and non-timeseries result
Browse files Browse the repository at this point in the history
  • Loading branch information
fenke committed Jan 22, 2025
1 parent d5b112c commit 3b65231
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 59 deletions.
77 changes: 57 additions & 20 deletions corebridge/aicorebridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,28 @@ def _init_processor(
self.processor_signature = inspect.signature(self.processor)
self.processor_params = dict(self.processor_signature.parameters)
self.return_param = self.processor_params.pop('return', None)

self.data_param, *self.call_params = list(self.processor_params.keys())

if not (
self.processor_params[self.data_param].annotation == pd.DataFrame
or self.processor_params[self.data_param].annotation == np.ndarray

):

self.data_param = None
self.call_params = list(self.processor_params.keys())



# %% ../nbs/01_aicorebridge.ipynb 15
# can be overloaded
@patch
def call_processor(self:AICoreModule, calldata, **callargs):
return self.processor(calldata, **callargs)
if self.data_param:
return self.processor(calldata, **callargs)
else:
return self.processor(**callargs)


# %% ../nbs/01_aicorebridge.ipynb 17
Expand Down Expand Up @@ -149,40 +163,60 @@ def infer(self:AICoreModule, data:dict, *_, **kwargs):
recordformat=recordformat,
timezone=timezone)

msg.append(f"calldata shape: {calldata.shape}")
#msg.append(f"calldata shape: {calldata.shape}")

history = build_historic_args(calldata, kwargs.pop('history', {}))

callargs = self.get_callargs(kwargs, history)

for arg, val in callargs.items():
msg.append(f"{arg}: {val}")

result = timeseries_dataframe(
self.call_processor(
calldata,
**callargs),
timezone=timezone)

msg.append(f"result shape: {result.shape}")
calculated_result = self.call_processor(
calldata,
**callargs
)

if samplerMethod:
msg.append(f"Sampler: {samplerMethod}, period: {samplerPeriod}")
result = timeseries_dataframe_resample(result, samplerPeriod, samplerMethod)
try:
result = timeseries_dataframe(
calculated_result,
timezone=timezone)

msg.append(f"result shape: {result.shape}")

msg.append(f"return-data shape: {result.shape}")
if samplerMethod:
msg.append(f"Sampler: {samplerMethod}, period: {samplerPeriod}")
result = timeseries_dataframe_resample(result, samplerPeriod, samplerMethod)

if reversed:
result = result[::-1]
msg.append(f"return-data shape: {result.shape}")

if reversed:
result = result[::-1]

return {
'msg':msg,
'data': timeseries_dataframe_to_datadict(
result if not lastSeen else result[-1:],
recordformat=recordformat,
timezone=timezone,
popNaN=True)
}

# tries dataframe return
except Exception as err:
msg.append(f"No timeseries data, error={err}")

df = pd.DataFrame(calculated_result)
df
df.columns = [f"value_{str(c)}" if isinstance(c, int) else str(c) for c in list(df.columns)]
df.reset_index().to_dict(orient='records')
return {
'msg':msg,
'data': timeseries_dataframe_to_datadict(
result if not lastSeen else result[-1:],
recordformat=recordformat,
timezone=timezone,
popNaN=True)
'data': df.reset_index().to_dict(orient='records')
}


# function try-catch
except Exception as err:
msg.append(''.join(traceback.format_exception(None, err, err.__traceback__)))
syslog.exception(f"Exception {str(err)} in infer()")
Expand Down Expand Up @@ -264,6 +298,9 @@ def get_call_data(
timezone='UTC'):

"Convert data to the processor signature"

if not self.data_param:
return None

df = set_time_index_zone(timeseries_dataframe_from_datadict(
data, ['datetimeMeasure', 'time'], recordformat), timezone)
Expand Down
Loading

0 comments on commit 3b65231

Please sign in to comment.