Skip to content

Commit

Permalink
Use scheduler to start Everserver
Browse files Browse the repository at this point in the history
  • Loading branch information
oyvindeide committed Nov 1, 2024
1 parent 6ec76aa commit a788312
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 86 deletions.
29 changes: 8 additions & 21 deletions src/everest/bin/everest_script.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
#!/usr/bin/env python

import argparse
import asyncio
import json
import logging
import signal
import threading
from functools import partial

from ert.config import ErtConfig
from ert.storage import open_storage
from everest.config import EverestConfig
from everest.detached import (
ServerStatus,
everserver_status,
generate_everserver_ert_config,
server_is_running,
start_server,
wait_for_context,
wait_for_server,
)
from everest.plugins.site_config_env import PluginSiteConfigEnv
from everest.util import makedirs_if_needed, version_info

from .utils import (
Expand Down Expand Up @@ -48,7 +44,7 @@ def everest_entry(args=None):
partial(handle_keyboard_interrupt, options=options),
)

run_everest(options)
asyncio.run(run_everest(options))


def _build_args_parser():
Expand Down Expand Up @@ -80,7 +76,7 @@ def _build_args_parser():
return arg_parser


def run_everest(options):
async def run_everest(options):
logger = logging.getLogger("everest_main")
server_state = everserver_status(options.config)

Expand All @@ -100,22 +96,13 @@ def run_everest(options):
job_name = fm_job.split()[0]
logger.info("Everest forward model contains job {}".format(job_name))

with PluginSiteConfigEnv():
ert_config = ErtConfig.with_plugins().from_dict(
config_dict=generate_everserver_ert_config(
options.config, options.debug
)
)

makedirs_if_needed(options.config.output_dir, roll_if_exists=True)

with open_storage(ert_config.ens_path, "w") as storage, PluginSiteConfigEnv():
context = start_server(options.config, ert_config, storage)
print("Waiting for server ...")
wait_for_server(options.config, timeout=600, context=context)
print("Everest server found!")
run_detached_monitor(options.config, show_all_jobs=options.show_all_jobs)
wait_for_context()
await start_server(options.config)
print("Waiting for server ...")
wait_for_server(options.config, timeout=600)
print("Everest server found!")
run_detached_monitor(options.config, show_all_jobs=options.show_all_jobs)

server_state = everserver_status(options.config)
server_state_info = server_state["message"]
Expand Down
78 changes: 13 additions & 65 deletions src/everest/detached/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
from seba_sqlite.exceptions import ObjectNotFoundError
from seba_sqlite.snapshot import SebaSnapshot

from ert import BatchContext, BatchSimulator, JobState
from ert.config import ErtConfig, QueueSystem
from ert.scheduler import LocalDriver
from ert.scheduler.driver import FailedSubmit
from ert.scheduler.event import StartedEvent
from everest.config import EverestConfig
from everest.config_keys import ConfigKeys as CK
from everest.simulator import JOB_FAILURE, JOB_SUCCESS, Status
Expand Down Expand Up @@ -56,7 +58,7 @@
_context = None


def start_server(config: EverestConfig, ert_config: ErtConfig, storage):
async def start_server(config: EverestConfig) -> None:
"""
Start an Everest server running the optimization defined in the config
"""
Expand All @@ -78,31 +80,21 @@ def start_server(config: EverestConfig, ert_config: ErtConfig, storage):
log_level=logging.INFO,
)

global _server # noqa: PLW0603
global _context # noqa: PLW0603
if _context and _context.running():
raise RuntimeError(
"Starting two instances of everest server "
"in the same process is not allowed!"
)

try:
_save_running_config(config)
except (OSError, LookupError) as e:
logging.getLogger(EVEREST).error(
"Failed to save optimization config: {}".format(e)
)

experiment = storage.create_experiment(
name=f"DetachedEverest@{datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}",
parameters=[],
responses=[],
)

_server = BatchSimulator(ert_config, experiment, {}, [])
_context = _server.start("dispatch_server", [(0, {})])

return _context
driver = LocalDriver()
try:
await driver.submit(0, "everserver", "--config-file", config.config_file)
except FailedSubmit as err:
raise ValueError(f"Failed to submit Everserver with error: {err}") from err
status = await driver.event_queue.get()
if not isinstance(status, StartedEvent):
raise ValueError(f"Everserver not started as expected, got status: {status}")


def _save_running_config(config: EverestConfig):
Expand All @@ -112,21 +104,6 @@ def _save_running_config(config: EverestConfig):
config.dump(save_config_path)


def context_stop_and_wait():
global _context # noqa: PLW0602
if _context:
_context.stop()
while _context.running():
time.sleep(1)


def wait_for_context():
global _context # noqa: PLW0602
if _context and _context.running():
while _context.running():
time.sleep(1)


def stop_server(config: EverestConfig, retries: int = 5):
"""
Stop server if found and it is running.
Expand Down Expand Up @@ -155,9 +132,7 @@ def extract_errors_from_file(path: str):
return re.findall(r"(Error \w+.*)", content)


def wait_for_server(
config: EverestConfig, timeout: int, context: Optional[BatchContext] = None
) -> None:
def wait_for_server(config: EverestConfig, timeout: int) -> None:
"""
Checks everest server has started _HTTP_REQUEST_RETRY times. Waits
progressively longer between each check.
Expand All @@ -178,33 +153,6 @@ def wait_for_server(
raise SystemExit(
"Failed to start Everest with error:\n{}".format(status["message"])
)
# Job queueing may fail:
if context is not None and context.has_job_failed(0):
job_progress = context.job_progress(0)

if job_progress is not None:
path = context.job_progress(0).steps[0].std_err_file
for err in extract_errors_from_file(path):
update_everserver_status(
config, ServerStatus.failed, message=err
)
logging.error(err)
raise SystemExit("Failed to start Everest server.")
else:
try:
state = context.get_job_state(0)

if state == JobState.WAITING:
# Job did fail, but is now in WAITING
logging.error(
"Race condition in wait_for_server, job did fail but is now in WAITING"
)
except IndexError as e:
# Job is no longer registered in scheduler
logging.error(
f"Race condition in wait_for_server, failed job removed from scheduler\n{e}"
)
raise SystemExit("Failed to start Everest server.") from e

sleep_time = sleep_time_increment * (2**retry_count)
time.sleep(sleep_time)
Expand Down

0 comments on commit a788312

Please sign in to comment.