Skip to content

Commit

Permalink
code review suggestions
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathan-eq committed Nov 26, 2024
1 parent ce86ea6 commit 1c4959b
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions src/_ert/forward_model_runner/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ async def main(args):
)
reporter_queue: asyncio.Queue[Message] = asyncio.Queue()

done_flag = asyncio.Event()
is_running = True

forward_model_runner_task = asyncio.create_task(
ForwardModelRunner(jobs_data, reporter_queue=reporter_queue).run(
parsed_args.job
)
)
reporting_task = asyncio.create_task(
handle_reporting(reporters, reporter_queue, done_flag)
handle_reporting(reporters, reporter_queue, is_running)
)

def handle_sigterm(*args, **kwargs):
Expand All @@ -166,22 +166,23 @@ def handle_sigterm(*args, **kwargs):

await forward_model_runner_task

done_flag.set()
is_running = False
await reporting_task


async def handle_reporting(
reporters: Sequence[reporting.Reporter],
message_queue: asyncio.Queue[Message],
done: asyncio.Event,
is_running: bool,
):
while not done.is_set() or not message_queue.empty():
while True:
try:
job_status = await asyncio.wait_for(message_queue.get(), timeout=2)
except asyncio.TimeoutError:
continue
logger.info(f"Job status: {job_status}")
if not is_running:
break

logger.info(f"Job status: {job_status}")
for reporter in reporters:
try:
await reporter.report(job_status)
Expand Down

0 comments on commit 1c4959b

Please sign in to comment.