Skip to content

Commit

Permalink
Use scheduler to start Everserver
Browse files Browse the repository at this point in the history
  • Loading branch information
oyvindeide committed Nov 1, 2024
1 parent 6ec76aa commit 1bca9a5
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 84 deletions.
25 changes: 6 additions & 19 deletions src/everest/bin/everest_script.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
#!/usr/bin/env python

import argparse
import asyncio
import json
import logging
import signal
import threading
from functools import partial

from ert.config import ErtConfig
from ert.storage import open_storage
from everest.config import EverestConfig
from everest.detached import (
ServerStatus,
everserver_status,
generate_everserver_ert_config,
server_is_running,
start_server,
wait_for_context,
wait_for_server,
)
from everest.plugins.site_config_env import PluginSiteConfigEnv
from everest.util import makedirs_if_needed, version_info

from .utils import (
Expand Down Expand Up @@ -100,22 +96,13 @@ def run_everest(options):
job_name = fm_job.split()[0]
logger.info("Everest forward model contains job {}".format(job_name))

with PluginSiteConfigEnv():
ert_config = ErtConfig.with_plugins().from_dict(
config_dict=generate_everserver_ert_config(
options.config, options.debug
)
)

makedirs_if_needed(options.config.output_dir, roll_if_exists=True)

with open_storage(ert_config.ens_path, "w") as storage, PluginSiteConfigEnv():
context = start_server(options.config, ert_config, storage)
print("Waiting for server ...")
wait_for_server(options.config, timeout=600, context=context)
print("Everest server found!")
run_detached_monitor(options.config, show_all_jobs=options.show_all_jobs)
wait_for_context()
asyncio.run(start_server(options.config))
print("Waiting for server ...")
wait_for_server(options.config, timeout=600)
print("Everest server found!")
run_detached_monitor(options.config, show_all_jobs=options.show_all_jobs)

server_state = everserver_status(options.config)
server_state_info = server_state["message"]
Expand Down
73 changes: 8 additions & 65 deletions src/everest/detached/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from seba_sqlite.exceptions import ObjectNotFoundError
from seba_sqlite.snapshot import SebaSnapshot

from ert import BatchContext, BatchSimulator, JobState
from ert.config import ErtConfig, QueueSystem
from ert.scheduler import LocalDriver
from everest.config import EverestConfig
from everest.config_keys import ConfigKeys as CK
from everest.simulator import JOB_FAILURE, JOB_SUCCESS, Status
Expand Down Expand Up @@ -56,7 +56,7 @@
_context = None


def start_server(config: EverestConfig, ert_config: ErtConfig, storage):
async def start_server(config: EverestConfig):
"""
Start an Everest server running the optimization defined in the config
"""
Expand All @@ -78,31 +78,18 @@ def start_server(config: EverestConfig, ert_config: ErtConfig, storage):
log_level=logging.INFO,
)

global _server # noqa: PLW0603
global _context # noqa: PLW0603
if _context and _context.running():
raise RuntimeError(
"Starting two instances of everest server "
"in the same process is not allowed!"
)

try:
_save_running_config(config)
except (OSError, LookupError) as e:
logging.getLogger(EVEREST).error(
"Failed to save optimization config: {}".format(e)
)

experiment = storage.create_experiment(
name=f"DetachedEverest@{datetime.now().strftime('%Y-%m-%d@%H:%M:%S')}",
parameters=[],
responses=[],
)

_server = BatchSimulator(ert_config, experiment, {}, [])
_context = _server.start("dispatch_server", [(0, {})])

return _context
driver = LocalDriver()
await driver.submit(0, "everserver", "--config-file", config.config_file)
status = await driver.event_queue.get()
print(status)
return driver


def _save_running_config(config: EverestConfig):
Expand All @@ -112,21 +99,6 @@ def _save_running_config(config: EverestConfig):
config.dump(save_config_path)


def context_stop_and_wait():
global _context # noqa: PLW0602
if _context:
_context.stop()
while _context.running():
time.sleep(1)


def wait_for_context():
global _context # noqa: PLW0602
if _context and _context.running():
while _context.running():
time.sleep(1)


def stop_server(config: EverestConfig, retries: int = 5):
"""
Stop server if found and it is running.
Expand Down Expand Up @@ -155,9 +127,7 @@ def extract_errors_from_file(path: str):
return re.findall(r"(Error \w+.*)", content)


def wait_for_server(
config: EverestConfig, timeout: int, context: Optional[BatchContext] = None
) -> None:
def wait_for_server(config: EverestConfig, timeout: int) -> None:
"""
Checks everest server has started _HTTP_REQUEST_RETRY times. Waits
progressively longer between each check.
Expand All @@ -178,33 +148,6 @@ def wait_for_server(
raise SystemExit(
"Failed to start Everest with error:\n{}".format(status["message"])
)
# Job queueing may fail:
if context is not None and context.has_job_failed(0):
job_progress = context.job_progress(0)

if job_progress is not None:
path = context.job_progress(0).steps[0].std_err_file
for err in extract_errors_from_file(path):
update_everserver_status(
config, ServerStatus.failed, message=err
)
logging.error(err)
raise SystemExit("Failed to start Everest server.")
else:
try:
state = context.get_job_state(0)

if state == JobState.WAITING:
# Job did fail, but is now in WAITING
logging.error(
"Race condition in wait_for_server, job did fail but is now in WAITING"
)
except IndexError as e:
# Job is no longer registered in scheduler
logging.error(
f"Race condition in wait_for_server, failed job removed from scheduler\n{e}"
)
raise SystemExit("Failed to start Everest server.") from e

sleep_time = sleep_time_increment * (2**retry_count)
time.sleep(sleep_time)
Expand Down

0 comments on commit 1bca9a5

Please sign in to comment.