Skip to content

Commit

Permalink
0.2.28
Browse files Browse the repository at this point in the history
  • Loading branch information
fenke committed Aug 21, 2024
1 parent 2337c1f commit efe0478
Show file tree
Hide file tree
Showing 7 changed files with 555 additions and 383 deletions.
2 changes: 1 addition & 1 deletion corebridge/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.25"
__version__ = "0.2.28"
4 changes: 2 additions & 2 deletions corebridge/_modidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,5 @@
'corebridge/rscript.py'),
'corebridge.rscript.run_rscript_nowait': ( 'rscriptbridge.html#run_rscript_nowait',
'corebridge/rscript.py'),
'corebridge.rscript.run_rscript_wait': ( 'rscriptbridge.html#run_rscript_wait',
'corebridge/rscript.py')}}}
'corebridge.rscript.run_rscript_wait': ('rscriptbridge.html#run_rscript_wait', 'corebridge/rscript.py'),
'corebridge.rscript.unpack_assets': ('rscriptbridge.html#unpack_assets', 'corebridge/rscript.py')}}}
11 changes: 8 additions & 3 deletions corebridge/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pandas as pd

# %% ../nbs/00_core.ipynb 4
def init_console_logging(name=None, level=logging.INFO):
def init_console_logging(name=None, level=logging.INFO, timestamp=True):
'''Setup none-blocking stream handler for sending loggin to the console.'''

# Only if no handlers defined.
Expand All @@ -25,7 +25,12 @@ def init_console_logging(name=None, level=logging.INFO):
console.setLevel(level)

# set a format which is simpler for console use
formatter = logging.Formatter("%(asctime)s %(levelname)s\t%(process)d\t%(name)s\t%(filename)s\t%(lineno)d\t%(message)s", datefmt='%Y-%m-%dT%H:%M:%S%z')
if timestamp:
formatter = logging.Formatter("%(asctime)s %(levelname)s\t%(process)d\t%(name)s\t%(filename)s\t%(lineno)d\t%(message)s", datefmt='%Y-%m-%dT%H:%M:%S%z')
else:
formatter = logging.Formatter("%(levelname)s\t%(process)d\t%(name)s\t%(filename)s\t%(lineno)d\t%(message)s")

#formatter = logging.Formatter("%(asctime)s %(levelname)s\t%(process)d\t%(name)s\t%(filename)s\t%(lineno)d\t%(message)s", datefmt='%Y-%m-%dT%H:%M:%S%z')

# tell the handler to use this format
console.setFormatter(formatter)
Expand All @@ -39,7 +44,7 @@ def init_console_logging(name=None, level=logging.INFO):

# %% ../nbs/00_core.ipynb 5
try:
logging.getLogger(__name__).info(f"Loading {__name__} from {__file__}")
logging.getLogger(__name__).info()(f"Loading {__name__} from {__file__}")
except:
pass

Expand Down
161 changes: 113 additions & 48 deletions corebridge/rscript.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

# %% auto 0
__all__ = ['syslog', 'read_chunk_size', 'RScriptProcess', 'get_asset_path', 'get_rscript_libpath', 'get_save_path',
'get_rscript_env', 'check_rscript_libs', 'check_rscript_lib', 'install_R_package_wait',
'get_rscript_env', 'check_rscript_libs', 'check_rscript_lib', 'install_R_package_wait', 'unpack_assets',
'calc_hash_from_flowobject', 'calc_hash_from_files', 'calc_hash_from_input_files',
'calc_hash_from_data_files', 'check_script_inputs', 'check_script_output', 'generate_checksum_file',
'run_rscript_wait', 'run_rscript_nowait']

# %% ../nbs/02_rscriptbridge.ipynb 5
# %% ../nbs/02_rscriptbridge.ipynb 4
import os, logging, json, hashlib
import fcntl, subprocess

Expand All @@ -16,10 +16,10 @@
from .aicorebridge import AICoreModule
from .core import init_console_logging

# %% ../nbs/02_rscriptbridge.ipynb 7
syslog = init_console_logging(__name__, logging.DEBUG)
# %% ../nbs/02_rscriptbridge.ipynb 6
syslog = init_console_logging(__name__, logging.DEBUG, timestamp=False)

# %% ../nbs/02_rscriptbridge.ipynb 9
# %% ../nbs/02_rscriptbridge.ipynb 8
def get_asset_path(script_name, assets_dir:str):
return os.path.join(assets_dir, script_name)
def get_rscript_libpath(save_dir:str):
Expand All @@ -28,23 +28,42 @@ def get_save_path(datafile_name:str, save_dir:str):
return os.path.join(save_dir, datafile_name)


# %% ../nbs/02_rscriptbridge.ipynb 38
# %% ../nbs/02_rscriptbridge.ipynb 35
def get_rscript_env(libfolder:str):
if os.environ.get('R_LIBS_USER'):
return dict(**os.environ)
else:
return dict(**os.environ, R_LIBS_USER=libfolder)
return dict(**os.environ, R_LIBS_USER=str(libfolder))

# %% ../nbs/02_rscriptbridge.ipynb 40
# %% ../nbs/02_rscriptbridge.ipynb 42
def check_rscript_libs(libs:list, libfolder:str):
"""Quick check if for all the R packages in libs a folder exists in libfolder"""
return all([os.path.exists(os.path.join(libfolder, L)) for L in libs])

def check_rscript_lib(lib:str, libfolder:str):
def check_rscript_lib(lib:str, libfolder:str) -> bool:
"""Checks if a R package is installed in libfolder
Parameters
----------
lib : str
name of the package
libfolder : str
path to the library folder
Returns
-------
bool
True if the package is installed, False otherwise
"""

run_script_result = subprocess.run(['Rscript','-e', f"library({lib})"], env=get_rscript_env(libfolder), capture_output=True)
if run_script_result.returncode != 0:
print('STDERR\n', run_script_result.stderr.decode('UTF-8'))
print('STDOUT\n', run_script_result.stdout.decode('UTF-8'))
return run_script_result.returncode == 0

# %% ../nbs/02_rscriptbridge.ipynb 44
def install_R_package_wait(pkg:str|list, libfolder:str, repo='https://cloud.r-project.org'):
# %% ../nbs/02_rscriptbridge.ipynb 48
def install_R_package_wait(pkg:str|list, workdir:str, repo='https://cloud.r-project.org'):
"""
Checks and if neccesary installs an R package
Expand All @@ -56,10 +75,15 @@ def install_R_package_wait(pkg:str|list, libfolder:str, repo='https://cloud.r-pr

if isinstance(pkg, str):
return install_R_package_wait([pkg], libfolder, repo)


libfolder=os.path.join(workdir, 'libs')
os.makedirs(libfolder, exist_ok=True)
syslog.debug(f"Using libfolder {libfolder} for packages")

env = dict(os.environ)
env['R_LIBS_USER'] = libfolder
env['R_LIBS_USER'] = os.path.abspath(libfolder)
syslog.debug(F"Using libfolder {env['R_LIBS_USER']} for R_LIBS_USER")


for pkg_i in pkg: # ['generics', 'timechange', 'rlang', 'stringi'] +
print(f"\nInstalling package {pkg_i}, testing attach ...")
Expand Down Expand Up @@ -88,7 +112,19 @@ def install_R_package_wait(pkg:str|list, libfolder:str, repo='https://cloud.r-pr



# %% ../nbs/02_rscriptbridge.ipynb 60
# %% ../nbs/02_rscriptbridge.ipynb 54
def unpack_assets(assets_dir:str, save_dir:str):
"""
Unpack the assets folder to the save_dir
"""
unpack_result = subprocess.Popen(
['unzip', '-o', '-d', save_dir, os.path.join(assets_dir, '*.zip')],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
return unpack_result

# %% ../nbs/02_rscriptbridge.ipynb 71
read_chunk_size = 1024 * 32
def calc_hash_from_flowobject(flow_object:dict)->str:
'''Calculate a unique hash for a given flow object'''
Expand Down Expand Up @@ -125,7 +161,7 @@ def calc_hash_from_data_files(flow_object:dict, save_dir:str)->str:
return calc_hash_from_files(flow_object['in'] + flow_object['out'], save_dir)


# %% ../nbs/02_rscriptbridge.ipynb 65
# %% ../nbs/02_rscriptbridge.ipynb 76
def check_script_inputs(flow_object:dict, save_dir:str)->bool:
"""
Check if the input files for a script are up-to-date, returns True if up-to-date.
Expand All @@ -139,7 +175,7 @@ def check_script_inputs(flow_object:dict, save_dir:str)->bool:

return int(md5_check_result.returncode) == 0

# %% ../nbs/02_rscriptbridge.ipynb 68
# %% ../nbs/02_rscriptbridge.ipynb 79
def check_script_output(flow_object:dict, save_dir:str)->bool:
"""
Check if the output files for a script exist, returns True if they all exist.
Expand All @@ -150,7 +186,7 @@ def check_script_output(flow_object:dict, save_dir:str)->bool:
for F in flow_object['out']
])

# %% ../nbs/02_rscriptbridge.ipynb 71
# %% ../nbs/02_rscriptbridge.ipynb 82
def generate_checksum_file(flow_object:dict, save_dir:str)->bool:
"""Generates the checksum file for a given flow object"""

Expand All @@ -167,7 +203,7 @@ def generate_checksum_file(flow_object:dict, save_dir:str)->bool:

return md5_encode_result.returncode == 0 and check_script_inputs(flow_object, save_dir)

# %% ../nbs/02_rscriptbridge.ipynb 80
# %% ../nbs/02_rscriptbridge.ipynb 91
def run_rscript_wait(flow_object, assets_dir:str, save_dir:str):
""" Run a script in R
args:
Expand Down Expand Up @@ -213,30 +249,36 @@ def run_rscript_wait(flow_object, assets_dir:str, save_dir:str):
return check_script_output(flow_object, save_dir) and generate_checksum_file(flow_object, save_dir)


# %% ../nbs/02_rscriptbridge.ipynb 87
RScriptProcess = namedtuple('RScriptProcess', ['lock_file', 'popen', 'flow_object'])
# %% ../nbs/02_rscriptbridge.ipynb 98
RScriptProcess = namedtuple('RScriptProcess', ['flow_object', 'lock_file', 'stdout','stderr', 'popen_args', 'popen'])

#### Asynchronous RScript processing ------------------------------------------------

# %% ../nbs/02_rscriptbridge.ipynb 89
def run_rscript_nowait(flow_object, assets_dir:str, save_dir:str, pkg_repo:str='https://cloud.r-project.org') -> RScriptProcess:
def run_rscript_nowait(flow_object, workdir:str, pkg_repo:str='https://cloud.r-project.org') -> RScriptProcess:
""" Run a script in R
args:
flow_object: dict of flow object
assets_dir: path to the assets directory
save_dir: path to the save directory
workdir: working directory
pkg_repo: CRAN package repository
returns:
RScriptProcess: Popen container object for the script
"""

syslog.debug(f"Starting script {flow_object['name']}")

# check lockfile ---------------------------------------------------------------
# do lock maintenance
# lockfile -------------------------------------------------------------------
os.makedirs(os.path.join(workdir, 'temp'), exist_ok=True)
def get_temp_path(lname):
return os.path.join(workdir, 'temp', lname)

lock_name = 'run_flow_object-'+calc_hash_from_flowobject(flow_object)

# lock maintenance
if run_rscript_nowait.lock_objects.get(lock_name):
lock_object = run_rscript_nowait.lock_objects[lock_name]
if not lock_object.lock_file.closed:
syslog.debug(f"Lockfile is open for {flow_object['name']} ({lock_name})")
# If the lockfile is open, check if the process is still running

if lock_object.popen is None:
syslog.debug(f"No process running for {flow_object['name']} ({lock_name})")
Expand All @@ -247,63 +289,86 @@ def run_rscript_nowait(flow_object, assets_dir:str, save_dir:str, pkg_repo:str='
syslog.debug(f"Script has finished for {flow_object['name']} ({lock_name}), returned {lock_object.popen.returncode}")
# since poll return not-None the script has finished so close the lockfile
lock_object.lock_file.close()
#run_rscript_nowait.lock_objects.pop(lock_name)
lock_object.stdout.close()
lock_object.stderr.close()
if lock_object.popen.returncode != 0:
syslog.error(f"Script failed for {flow_object['name']} ({lock_name}), returned {lock_object.popen.returncode}")
syslog.error(f"Args were: {lock_object.popen_args}")
with open(lock_object.stdout.name, 'rb') as so:
syslog.error(f"STDOUT\n{so.read().decode('UTF-8')}")
with open(lock_object.stderr.name, 'rb') as se:
syslog.error(f"STDERR\n{se.read().decode('UTF-8')}")
else:
generate_checksum_file(flow_object, workdir)

#os.remove(lock_object.stdout.name)
#os.remove(lock_object.stderr.name)


# Check if output exists and inputs have not changed and return False if
# output exists and inputs have not changed
if check_script_output(flow_object, save_dir) and check_script_inputs(flow_object, save_dir):
if check_script_output(flow_object, workdir) and check_script_inputs(flow_object, workdir):
syslog.debug(f"Output and inputs are up-to-date for {flow_object['name']}")
return run_rscript_nowait.lock_objects.get(lock_name)

# Create the lock file -----------------------------------------------------------
syslog.debug(f"Preparing to run scripts for {flow_object['name']}, creating lockfile ({lock_name})")
cf = open(get_save_path(f"lock-{lock_name}", save_dir), 'wt')
cf = open(get_temp_path(f"lock-{lock_name}"), 'wt')

try:
# Set lock on lockfile
fcntl.flock(cf, fcntl.LOCK_EX | fcntl.LOCK_NB)

so = open(get_temp_path(f"stdout-{lock_name}"), 'wt')
se = open(get_temp_path(f"stderr-{lock_name}"), 'wt')

# check libs
libfolder=get_rscript_libpath(save_dir)
libfolder=os.path.join(workdir, 'libs')
os.makedirs(libfolder, exist_ok=True)
syslog.debug(f"Using libfolder {libfolder} for packages")

env = dict(os.environ)
env['R_LIBS_USER'] = libfolder

env['R_LIBS_USER'] = os.path.abspath(libfolder)
syslog.debug(F"Using libfolder {env['R_LIBS_USER']} for R_LIBS_USER")

if not check_rscript_libs(flow_object['libs'], libfolder):
syslog.debug(f"Installing libs for {flow_object['name']} ({lock_name}): {flow_object['libs']}")
for pkg_i in flow_object['libs']:
syslog.debug(f"Checking lib {pkg_i} for {flow_object['name']} ({lock_name})")
if not check_rscript_lib(pkg_i, libfolder):
syslog.debug(f"Starting installation of {pkg_i} for {flow_object['name']} ({lock_name})")
run_script_install = subprocess.Popen([
popen_args = [
'Rscript','-e',
f"install.packages('{pkg_i}', repos='{pkg_repo}', lib='{libfolder}', dependencies=TRUE)"
],
cwd=save_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
f"install.packages('{pkg_i}', repos='{pkg_repo}', lib='{libfolder}', dependencies=TRUE)",
]
run_script_install = subprocess.Popen(
popen_args,
cwd=workdir,
stdout=so,
stderr=se,
encoding='UTF-8',
env=env)
run_rscript_nowait.lock_objects[lock_name] = RScriptProcess(cf, run_script_install, flow_object)
env=env,
)
run_rscript_nowait.lock_objects[lock_name] = RScriptProcess(flow_object, cf, so, se, popen_args, run_script_install)
return run_rscript_nowait.lock_objects.get(lock_name)


syslog.debug(f"Libs are up-to-date, starting script for {flow_object['name']} ({lock_name})")
# run the script
popen = subprocess.Popen(
['Rscript', '--vanilla', get_asset_path(flow_object['name'], assets_dir)],
cwd=save_dir,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
popen_args = ['Rscript', flow_object['name']]
popen_run = subprocess.Popen(
popen_args,
cwd=workdir,
stdout=so,
stderr=se,
encoding='UTF-8',
env=env,
)

run_rscript_nowait.lock_objects[lock_name] = RScriptProcess(cf, popen, flow_object)
run_rscript_nowait.lock_objects[lock_name] = RScriptProcess(flow_object, cf, so, se, popen_args, popen_run)

except BlockingIOError as locked_error:
cf.close()
syslog.error(locked_error)
#syslog.error(f"{flow_object['name']} is locked, cannot run", exc_info=locked_error)

syslog.debug(f"Done with {flow_object['name']}.")

Expand Down
11 changes: 8 additions & 3 deletions nbs/00_core.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"source": [
"#| exports\n",
"\n",
"def init_console_logging(name=None, level=logging.INFO):\n",
"def init_console_logging(name=None, level=logging.INFO, timestamp=True):\n",
" '''Setup none-blocking stream handler for sending loggin to the console.'''\n",
"\n",
" # Only if no handlers defined.\n",
Expand All @@ -63,7 +63,12 @@
" console.setLevel(level)\n",
"\n",
" # set a format which is simpler for console use\n",
" formatter = logging.Formatter(\"%(asctime)s %(levelname)s\\t%(process)d\\t%(name)s\\t%(filename)s\\t%(lineno)d\\t%(message)s\", datefmt='%Y-%m-%dT%H:%M:%S%z')\n",
" if timestamp:\n",
" formatter = logging.Formatter(\"%(asctime)s %(levelname)s\\t%(process)d\\t%(name)s\\t%(filename)s\\t%(lineno)d\\t%(message)s\", datefmt='%Y-%m-%dT%H:%M:%S%z')\n",
" else:\n",
" formatter = logging.Formatter(\"%(levelname)s\\t%(process)d\\t%(name)s\\t%(filename)s\\t%(lineno)d\\t%(message)s\")\n",
" \n",
" #formatter = logging.Formatter(\"%(asctime)s %(levelname)s\\t%(process)d\\t%(name)s\\t%(filename)s\\t%(lineno)d\\t%(message)s\", datefmt='%Y-%m-%dT%H:%M:%S%z')\n",
"\n",
" # tell the handler to use this format\n",
" console.setFormatter(formatter)\n",
Expand All @@ -83,7 +88,7 @@
"source": [
"#| export\n",
"try:\n",
" logging.getLogger(__name__).info(f\"Loading {__name__} from {__file__}\")\n",
" logging.getLogger(__name__).info()(f\"Loading {__name__} from {__file__}\")\n",
"except:\n",
" pass"
]
Expand Down
Loading

0 comments on commit efe0478

Please sign in to comment.