Skip to content

Commit

Permalink
export
Browse files Browse the repository at this point in the history
  • Loading branch information
fenke committed Jul 10, 2024
1 parent bdb7198 commit d96cf00
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 16 deletions.
11 changes: 9 additions & 2 deletions corebridge/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,12 @@
'corebridge/core.py'),
'corebridge.core.timeseries_dataframe_to_datadict': ( 'core.html#timeseries_dataframe_to_datadict',
'corebridge/core.py')},
'corebridge.rscript': { 'corebridge.rscript.check_script_inputs': ( 'rscriptbridge.html#check_script_inputs',
'corebridge/rscript.py')}}}
'corebridge.rscript': { 'corebridge.rscript.calc_hash_from_flowobject': ( 'rscriptbridge.html#calc_hash_from_flowobject',
'corebridge/rscript.py'),
'corebridge.rscript.check_script_inputs': ( 'rscriptbridge.html#check_script_inputs',
'corebridge/rscript.py'),
'corebridge.rscript.check_script_output': ( 'rscriptbridge.html#check_script_output',
'corebridge/rscript.py'),
'corebridge.rscript.install_R_package': ( 'rscriptbridge.html#install_r_package',
'corebridge/rscript.py'),
'corebridge.rscript.run_script': ('rscriptbridge.html#run_script', 'corebridge/rscript.py')}}}
37 changes: 26 additions & 11 deletions corebridge/aicorebridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,23 @@
except:
pass

# %% ../nbs/01_aicorebridge.ipynb 7
def build_historic_args(data, history):
# %% ../nbs/01_aicorebridge.ipynb 8
def build_historic_args(data:pd.DataFrame, history:dict|list) -> dict:
"""Create a timeseries DataFrame from historic data defined in `history`.
Parameters
----------
data : pd.DataFrame
The input time-series DataFrame.
history : dict or list of dicts
Historic data definition, each item in the list is a dictionary with a startDate key to set the start of a section of historic data in the result and a column-value pair for each of the columns in the
Returns
-------
historic_data : dict
Historic data in dictionary format where keys are column names and values are the historic values as numpy array.
"""

if not history:
return {}

Expand Down Expand Up @@ -62,10 +77,10 @@ def build_historic_args(data, history):
#return pd.DataFrame(column_data, index=data.index)


# %% ../nbs/01_aicorebridge.ipynb 10
# %% ../nbs/01_aicorebridge.ipynb 12
class AICoreModule(): pass

# %% ../nbs/01_aicorebridge.ipynb 11
# %% ../nbs/01_aicorebridge.ipynb 13
@patch
def __init__(self:AICoreModule,
processor:typing.Callable, # data processing function
Expand All @@ -85,7 +100,7 @@ def __init__(self:AICoreModule,



# %% ../nbs/01_aicorebridge.ipynb 12
# %% ../nbs/01_aicorebridge.ipynb 14
@patch
def _init_processor(
self:AICoreModule,
Expand All @@ -99,14 +114,14 @@ def _init_processor(
self.data_param, *self.call_params = list(self.processor_params.keys())


# %% ../nbs/01_aicorebridge.ipynb 13
# %% ../nbs/01_aicorebridge.ipynb 15
# can be overloaded
@patch
def call_processor(self:AICoreModule, calldata, **callargs):
return self.processor(calldata, **callargs)


# %% ../nbs/01_aicorebridge.ipynb 15
# %% ../nbs/01_aicorebridge.ipynb 17
@patch
def infer(self:AICoreModule, data:dict, *_, **kwargs):
try:
Expand Down Expand Up @@ -175,7 +190,7 @@ def infer(self:AICoreModule, data:dict, *_, **kwargs):
}


# %% ../nbs/01_aicorebridge.ipynb 17
# %% ../nbs/01_aicorebridge.ipynb 19
# Specialized types for initializing annotated parameters
# Add types by adding a tuple with the type name and a builder function
annotated_arg_builders = {
Expand All @@ -184,7 +199,7 @@ def infer(self:AICoreModule, data:dict, *_, **kwargs):
]
}

# %% ../nbs/01_aicorebridge.ipynb 18
# %% ../nbs/01_aicorebridge.ipynb 20
@patch
def init_annotated_param(self:AICoreModule, param_name, value):
"""
Expand All @@ -211,7 +226,7 @@ def init_annotated_param(self:AICoreModule, param_name, value):



# %% ../nbs/01_aicorebridge.ipynb 19
# %% ../nbs/01_aicorebridge.ipynb 21
@patch
def get_callargs(self:AICoreModule, kwargs, history):
"Get arguments for the processor call"
Expand All @@ -237,7 +252,7 @@ def get_callargs(self:AICoreModule, kwargs, history):
}


# %% ../nbs/01_aicorebridge.ipynb 23
# %% ../nbs/01_aicorebridge.ipynb 25
@patch
def get_call_data(
self:AICoreModule,
Expand Down
97 changes: 94 additions & 3 deletions corebridge/rscript.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,104 @@
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/02_rscriptbridge.ipynb.

# %% auto 0
__all__ = []
__all__ = ['install_R_package', 'calc_hash_from_flowobject', 'check_script_inputs', 'check_script_output', 'run_script']

# %% ../nbs/02_rscriptbridge.ipynb 3
# %% ../nbs/02_rscriptbridge.ipynb 4
import json, os
import subprocess
import warnings
import hashlib

from functools import reduce

# %% ../nbs/02_rscriptbridge.ipynb 30
def install_R_package(pkg:str|list):
"""
Checks and if neccesary installs an R package
Parameters
----------
pkg : str|list
name(s) of the package(s)
"""

if isinstance(pkg, str):
pkg = [pkg]

for pkg_i in pkg:
run_script_result = subprocess.run(['Rscript','-e', f"library({pkg_i})"], capture_output=True)
if run_script_result.returncode != 0:
print(f"Installing {pkg_i}")
run_script_result = subprocess.run(['Rscript','-e', f"install.packages({pkg_i}, repos='https://cloud.r-project.org')"], capture_output=True)
else:
print(f"Library {pkg_i} already installed")

print(run_script_result.stderr.decode('UTF-8'))



# %% ../nbs/02_rscriptbridge.ipynb 45
def calc_hash_from_flowobject(flow_object:dict)->str:
return hashlib.md5(repr(flow_object).encode('UTF-8')).hexdigest()

# %% ../nbs/02_rscriptbridge.ipynb 48
def check_script_inputs(flow_object:dict)->bool:
"""
Check if the input files for a script are up-to-date, returns True if up-to-date.
"""

checksum_file = get_save_path(f"input-checksum-{calc_hash_from_flowobject(flow_object)}")
md5_check_result = subprocess.run(
['md5sum', '-c', checksum_file],
cwd=save_dir,
capture_output=True)

return int(md5_check_result.returncode) == 0

# %% ../nbs/02_rscriptbridge.ipynb 51
def check_script_output(flow_object:dict)->bool:
"""
Check if the output files for a script exist, returns True if they all exist.
"""

return all([
os.path.isfile(get_save_path(F))
for F in flow_object['out']
])

# %% ../nbs/02_rscriptbridge.ipynb 54
def run_script(flow_object):
""" Run a script in R
args:
flow_object: dict of flow object
returns:
bool: False if nothing has changed, or an update failed,
and True if a follow-up script might need to be run
"""

# Check if output exists and inputs have not changed and return False if
# output exists and inputs have not changed
if check_script_output(flow_object) and check_script_inputs(flow_object):
return False

# Run script
run_script_result = subprocess.run(
['Rscript', '--vanilla', get_asset_path(flow_object['name'])],
cwd=save_dir,
capture_output=True
)

# check the return code
if run_script_result.returncode:
print(f"Run returned code {run_script_result.returncode}")
print('STDOUT------------\n', run_script_result.stdout.decode('UTF-8'))
print('STDERR------------\n', run_script_result.stderr.decode('UTF-8'))
return False

# check the output
if not check_script_output(flow_object):
print(f"Output not found for {flow_object['name']}")
return False

return check_script_output(flow_object)

0 comments on commit d96cf00

Please sign in to comment.