Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq committed Nov 20, 2024
1 parent 10689e3 commit 5c8b1c2
Show file tree
Hide file tree
Showing 11 changed files with 14 additions and 88 deletions.
4 changes: 2 additions & 2 deletions src/_ert/forward_model_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,12 @@ async def handle_reporting(
)
await let_reporters_finish(reporters)
raise ForwardModelRunnerException from oserror
print(f"REPORTERS REPORTED {type(job_status)=}")

message_queue.task_done()
if isinstance(job_status, Finish) and not job_status.success():
print("JONAK HERE")
await let_reporters_finish(reporters)
raise ForwardModelRunnerException

await let_reporters_finish(reporters)


Expand Down
5 changes: 1 addition & 4 deletions src/_ert/forward_model_runner/forward_model_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,8 @@ def _create_environment(self) -> Optional[Dict[str, str]]:

async def _run(self) -> "AsyncGenerator[Start | Exited | Running | None]":
start_message = self.create_start_message_and_check_job_files()
print("ACTUALLY RAN")
yield start_message
if not start_message.success():
print("NOT SUCCESS")
return

arg_list = self._build_arg_list()
Expand Down Expand Up @@ -216,7 +214,7 @@ async def _run(self) -> "AsyncGenerator[Start | Exited | Running | None]":
try:
exit_code = await asyncio.wait_for(
proc.wait(), timeout=self.MEMORY_POLL_PERIOD
) # process.wait(timeout=self.MEMORY_POLL_PERIOD)
)
except asyncio.TimeoutError:
potential_exited_msg = (
self.handle_process_timeout_and_create_exited_msg(exit_code, proc)
Expand Down Expand Up @@ -362,7 +360,6 @@ def _check_job_files(self) -> list[str]:

if executable_error := check_executable(self.job_data.get("executable")):
errors.append(executable_error)
print(f"{errors=}")
return errors

def _check_target_file_is_written(
Expand Down
4 changes: 2 additions & 2 deletions src/_ert/forward_model_runner/job_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def sigterm_handler(_signo, _stack_frame):


def main():
# os.nice(19)
# signal.signal(signal.SIGTERM, sigterm_handler)
os.nice(19)
signal.signal(signal.SIGTERM, sigterm_handler)
asyncio.run(job_runner_main(sys.argv))


Expand Down
15 changes: 4 additions & 11 deletions src/_ert/forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,17 @@ def __init__(self, evaluator_url, token=None, cert_path=None):
self._real_id = None
self._event_queue: asyncio.Queue[events.Event | EventSentinel] = asyncio.Queue()

# seconds to timeout the reporter the thread after Finish() was received
self._timeout_timestamp = None
# seconds to timeout the reporter the thread after Finish() was received
self._reporter_timeout = 60

self._queue_polling_timeout = 2
self._event_publishing_task = asyncio.create_task(self.async_event_publisher())
self._event_publisher_ready = asyncio.Event()

async def join(self) -> None:
print("called join")
await self._event_publishing_task

async def stop(self) -> None:
print("called stop")
await self._event_queue.put(Event._sentinel)
await self.join()

Expand All @@ -94,7 +91,6 @@ async def async_event_publisher(self):
token=self._token,
cert=self._cert,
) as client:
self._event_publisher_ready.set()
event = None
while (
self._timeout_timestamp is None
Expand All @@ -109,14 +105,14 @@ async def async_event_publisher(self):
)
except asyncio.TimeoutError:
continue

if event is self._sentinel:
self._event_queue.task_done()
break
try:
await client.send(event_to_json(event))
self._event_queue.task_done()
event = None
print("Sent event :)")
except ClientConnectionError as exception:
# Possible intermittent failure, we retry sending the event
logger.error(str(exception))
Expand All @@ -126,10 +122,8 @@ async def async_event_publisher(self):
logger.debug(str(exception))
self._event_queue.task_done()
break
print("TIMED OUT")

async def report(self, msg: Message):
await self._event_publisher_ready.wait()
await self._report(msg)

async def _report(self, msg: Message):
Expand All @@ -143,7 +137,6 @@ async def _report(self, msg: Message):
await self._finished_handler()

async def _dump_event(self, event: events.Event):
print(f"DUMPED EVENT {type(event)=}")
logger.debug(f'Schedule "{type(event)}" for delivery')
await self._event_queue.put(event)

Expand Down Expand Up @@ -205,13 +198,13 @@ async def _finished_handler(self):
seconds=self._reporter_timeout
)

async def _checksum_handler(self, msg: Checksum):
async def _checksum_handler(self, msg: Checksum) -> None:
fm_checksum = ForwardModelStepChecksum(
ensemble=self._ens_id,
real=self._real_id,
checksums={msg.run_path: msg.data},
)
await self._dump_event(fm_checksum)

def cancel(self):
def cancel(self) -> None:
self._event_publishing_task.cancel()
2 changes: 0 additions & 2 deletions src/_ert/forward_model_runner/reporting/file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import functools
import logging
import os
Expand Down Expand Up @@ -42,7 +41,6 @@ def __init__(self):

async def report(self, msg: Message):
fm_step_status = {}
await asyncio.sleep(0)
if msg.job:
logger.debug("Adding message job to status dictionary.")
fm_step_status = self.status_dict["jobs"][msg.job.index]
Expand Down
6 changes: 2 additions & 4 deletions src/_ert/forward_model_runner/reporting/interactive.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from typing import Optional

from _ert.forward_model_runner.reporting.base import Reporter
Expand All @@ -12,7 +11,7 @@

class Interactive(Reporter):
@staticmethod
async def _report(msg: Message) -> Optional[str]:
def _report(msg: Message) -> Optional[str]:
if not isinstance(msg, (Start, Finish)):
return None
if isinstance(msg, Finish):
Expand All @@ -28,8 +27,7 @@ async def _report(msg: Message) -> Optional[str]:
return f"Running job: {msg.job.name()} ... "

async def report(self, msg: Message):
_msg = await self._report(msg)
await asyncio.sleep(0)
_msg = self._report(msg)
if _msg is not None:
print(_msg)

Expand Down
14 changes: 0 additions & 14 deletions src/_ert/forward_model_runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,49 +81,35 @@ async def run(self, names_of_steps_to_run: List[str]) -> None:
f"Available forward_model steps: {[step.name() for step in self.steps]}"
)
await self.put_event(init_message)
await asyncio.sleep(0)
return

await self.put_event(init_message)
await asyncio.sleep(0)
for step in step_queue:
async for status_update in step.run():
await self.put_event(status_update)
await asyncio.sleep(0)
if not status_update.success():
print("JONAK 2")
await self.put_event(
Checksum(checksum_dict={}, run_path=os.getcwd())
)
await asyncio.sleep(0)
await self.put_event(
Finish().with_error(
"Not all forward model steps completed successfully."
)
)
await asyncio.sleep(0)
return
await asyncio.sleep(0)
checksum_dict = self._populate_checksums(self._read_manifest())
print("JONAK 3")
await self.put_event(
Checksum(checksum_dict=checksum_dict, run_path=os.getcwd())
)
await asyncio.sleep(0)
print("YIELDING FINISH")
await self.put_event(Finish())
await asyncio.sleep(0)
print("YIELDED FINISH")
return
except asyncio.CancelledError:
print("JONAK 4")
await self.put_event(Checksum(checksum_dict={}, run_path=os.getcwd()))
await self.put_event(
Finish().with_error(
"Not all forward model steps completed successfully."
)
)
await asyncio.sleep(0)
return

def _set_environment(self):
Expand Down
4 changes: 2 additions & 2 deletions src/ert/gui/simulation/experiment_panel.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def run_experiment(self) -> None:
self._model = model

QApplication.restoreOverrideCursor()
if True: # model.check_if_runpath_exists():
if model.check_if_runpath_exists():
msg_box = QMessageBox(self)
msg_box.setObjectName("RUN_PATH_WARNING_BOX")

Expand All @@ -270,7 +270,7 @@ def run_experiment(self) -> None:
"might be overwritten.\n"
"- Previously generated files might "
"be used if not configured correctly.\n"
# f"- {model.get_number_of_existing_runpaths()} out of {model.get_number_of_active_realizations()} realizations "
f"- {model.get_number_of_existing_runpaths()} out of {model.get_number_of_active_realizations()} realizations "
"are running in existing runpaths.\n"
"Are you sure you want to continue?"
)
Expand Down
1 change: 0 additions & 1 deletion tests/ert/ui_tests/gui/test_missing_runpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def test_missing_runpath_has_isolated_failures(
tmp_path, run_experiment, qtbot, monkeypatch
):
monkeypatch.chdir(tmp_path)
print(f"{tmp_path=}")
write_config(tmp_path, "LOCAL")

def handle_message_box(dialog):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ async def test_report_with_successful_start_message_argument(unused_tcp_port):
assert event.fm_step == "0"
assert os.path.basename(event.std_out) == "stdout"
assert os.path.basename(event.std_err) == "stderr"
reporter._event_publishing_task.cancel()
reporter.cancel()


async def test_report_with_failed_start_message_argument(unused_tcp_port):
Expand Down
45 changes: 0 additions & 45 deletions tests/ert/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
import asyncio
import contextlib
import time
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING

import websockets.server

from _ert.forward_model_runner.client import Client
from _ert.threading import ErtThread
from ert.scheduler.event import FinishedEvent, StartedEvent

if TYPE_CHECKING:
Expand Down Expand Up @@ -71,28 +69,6 @@ async def async_wait_until(condition, timeout, fail_msg, interval=0.1):
raise AssertionError(fail_msg)


def _mock_ws(host, port, messages, delay_startup=0):
loop = asyncio.new_event_loop()
done = loop.create_future()

async def _handler(websocket, path):
while True:
msg = await websocket.recv()
messages.append(msg)
if msg == "stop":
print("SHOULD STOP!")
done.set_result(None)
break

async def _run_server():
await asyncio.sleep(delay_startup)
async with websockets.server.serve(_handler, host, port):
await done

loop.run_until_complete(_run_server())
loop.close()


async def _mock_ws_async(host, port, messages, delay_startup=0):
done = asyncio.Future()

Expand All @@ -109,27 +85,6 @@ async def _handler(websocket, path):
await done


@contextlib.asynccontextmanager
async def _mock_ws_thread(host, port, messages):
mock_ws_thread = ErtThread(
target=partial(_mock_ws, messages=messages),
args=(
host,
port,
),
)
mock_ws_thread.start()
try:
yield
# Make sure to join the thread even if an exception occurs
finally:
url = f"ws://{host}:{port}"
async with Client(url) as client:
await client.send("stop")
mock_ws_thread.join()
messages.pop()


@contextlib.asynccontextmanager
async def _mock_ws_task(host, port, messages, delay_startup=0):
mock_ws_task = asyncio.create_task(
Expand Down

0 comments on commit 5c8b1c2

Please sign in to comment.