Skip to content

Commit

Permalink
Merge pull request #874 from MolSSI/internal_jobs
Browse files Browse the repository at this point in the history
Improve internal job handling
  • Loading branch information
bennybp authored Jan 2, 2025
2 parents 26e69a0 + fa6d265 commit c166787
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 191 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Add repeat_delay to internal_jobs
Revision ID: e798462e0c03
Revises: 0587bb0220aa
Create Date: 2025-01-02 08:48:50.123907
"""

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "e798462e0c03"
down_revision = "0587bb0220aa"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("internal_jobs", sa.Column("repeat_delay", sa.Integer(), nullable=True))
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("internal_jobs", "repeat_delay")
# ### end Alembic commands ###
7 changes: 5 additions & 2 deletions qcfractal/qcfractal/components/internal_jobs/db_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

from typing import TYPE_CHECKING

from sqlalchemy import Column, Integer, TIMESTAMP, String, JSON, Index, Enum, UniqueConstraint, ForeignKey
from sqlalchemy import Column, Integer, TIMESTAMP, String, Index, Enum, UniqueConstraint, ForeignKey
from sqlalchemy import DDL, event
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.orm import relationship

from qcfractal.components.auth.db_models import UserIDMapSubquery, UserORM
Expand Down Expand Up @@ -35,7 +36,9 @@ class InternalJobORM(BaseORM):
kwargs = Column(JSON, nullable=False)

after_function = Column(String, nullable=True)
after_function_kwargs = Column(JSON, nullable=True)
after_function_kwargs = Column(JSON(none_as_null=True), nullable=True)

repeat_delay = Column(Integer, nullable=True)

result = Column(JSON)
user_id = Column(Integer, ForeignKey(UserORM.id, ondelete="cascade"), nullable=True)
Expand Down
121 changes: 72 additions & 49 deletions qcfractal/qcfractal/components/internal_jobs/socket.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import inspect
import logging
import select as io_select
import traceback
Expand Down Expand Up @@ -46,30 +47,18 @@ def __init__(self, root_socket: SQLAlchemySocket):
self._delete_internal_job_frequency = 60 * 60 * 24 # one day
self._internal_job_keep = root_socket.qcf_config.internal_job_keep

# Wait a bit after startup
self.add_internal_job_delete_old_internal_jobs(2.0)

def add_internal_job_delete_old_internal_jobs(self, delay: float, *, session: Optional[Session] = None):
"""
Adds an internal job to delete old, finished internal jobs
"""

# Only add this if we are going to delete some
if self._internal_job_keep <= 0:
return

with self.root_socket.optional_session(session) as session:
self.add(
"delete_old_internal_jobs",
now_at_utc() + timedelta(seconds=delay),
"internal_jobs.delete_old_internal_jobs",
{},
user_id=None,
unique_name=True,
after_function="internal_jobs.add_internal_job_delete_old_internal_jobs",
after_function_kwargs={"delay": self._delete_internal_job_frequency}, # wait one day
session=session,
)
if self._internal_job_keep > 0:
with self.root_socket.session_scope() as session:
self.add(
"delete_old_internal_jobs",
now_at_utc() + timedelta(seconds=2.0),
"internal_jobs.delete_old_internal_jobs",
{},
user_id=None,
unique_name=True,
repeat_delay=self._delete_internal_job_frequency,
session=session,
)

def add(
self,
Expand All @@ -81,6 +70,7 @@ def add(
unique_name: bool = False,
after_function: Optional[str] = None,
after_function_kwargs: Optional[Dict[str, Any]] = None,
repeat_delay: Optional[int] = None,
*,
session: Optional[Session] = None,
) -> int:
Expand All @@ -107,6 +97,8 @@ def add(
When this job is done, call this function
after_function_kwargs
Arguments to use when calling `after_function`
repeat_delay
If set, will submit a new, identical job to be run repeat_delay seconds after this one finishes
session
An existing SQLAlchemy session to use. If None, one will be created. If an existing session
is used, it will be flushed (but not committed) before returning from this function.
Expand All @@ -122,48 +114,45 @@ def add(
stmt = insert(InternalJobORM)
stmt = stmt.values(
name=name,
status=InternalJobStatusEnum.waiting,
unique_name=name,
scheduled_date=scheduled_date,
function=function,
kwargs=kwargs,
after_function=after_function,
after_function_kwargs=after_function_kwargs,
status=InternalJobStatusEnum.waiting,
repeat_delay=repeat_delay,
user_id=user_id,
)
stmt = stmt.on_conflict_do_nothing()
stmt = stmt.on_conflict_do_update(
constraint="ux_internal_jobs_unique_name",
set_={
"after_function": after_function,
"after_function_kwargs": after_function_kwargs,
"repeat_delay": repeat_delay,
},
)
stmt = stmt.returning(InternalJobORM.id)
job_id = session.execute(stmt).scalar_one_or_none()

if job_id is None:
if job_id is not None:
self._logger.debug(f"Job with name {name} added or updated - id: {job_id}")
else:
# Nothing was returned, meaning nothing was inserted
self._logger.debug(f"Job with name {name} already found. Not adding")
self._logger.debug(f"Job with name {name} already found. Not adding?")
stmt = select(InternalJobORM.id).where(InternalJobORM.unique_name == name)
job_id = session.execute(stmt).scalar_one_or_none()

if job_id is None:
# Should be very rare (time-of-check to time-of-use condition: was deleted
# after checking for existence but before getting ID)
self._logger.debug(f"Handling job {name} time-of-check to time-of-use condition")
self.add(
name=name,
scheduled_date=scheduled_date,
function=function,
kwargs=kwargs,
user_id=user_id,
unique_name=unique_name,
after_function=after_function,
after_function_kwargs=after_function_kwargs,
session=session,
)

else:
job_orm = InternalJobORM(
name=name,
status=InternalJobStatusEnum.waiting,
scheduled_date=scheduled_date,
function=function,
kwargs=kwargs,
status=InternalJobStatusEnum.waiting,
after_function=after_function,
after_function_kwargs=after_function_kwargs,
repeat_delay=repeat_delay,
user_id=user_id,
)
if unique_name:
Expand Down Expand Up @@ -289,7 +278,7 @@ def delete(self, job_id: int, *, session: Optional[Session] = None):
with self.root_socket.optional_session(session) as session:
session.execute(stmt)

def delete_old_internal_jobs(self, session: Session, job_progress: JobProgress) -> None:
def delete_old_internal_jobs(self, session: Session) -> None:
"""
Deletes old internal jobs (as defined by the configuration)
"""
Expand Down Expand Up @@ -343,7 +332,20 @@ def _run_single(self, session: Session, job_orm: InternalJobORM, logger, job_pro

# Function must be part of the sockets
func = func_attr(self.root_socket)
result = func(**job_orm.kwargs, job_progress=job_progress, session=session)

# We need to determine the parameters of this function
func_params = inspect.signature(func).parameters

add_kwargs = {}

# If the function has a "job_progress" and/or "session" args, pass those in
if "job_progress" in func_params:
add_kwargs["job_progress"] = job_progress

if "session" in func_params:
add_kwargs["session"] = session

result = func(**job_orm.kwargs, **add_kwargs)

# Mark complete, unless this was cancelled
if not job_progress.cancelled():
Expand All @@ -363,6 +365,7 @@ def _run_single(self, session: Session, job_orm: InternalJobORM, logger, job_pro
job_orm.result = result

# Clear the unique name so we can add another one if needed
has_unique_name = job_orm.unique_name is not None
job_orm.unique_name = None

# Flush but don't commit. This will prevent marking the task as finished
Expand All @@ -374,10 +377,30 @@ def _run_single(self, session: Session, job_orm: InternalJobORM, logger, job_pro
if job_orm.status == InternalJobStatusEnum.complete and job_orm.after_function is not None:
after_func_attr = attrgetter(job_orm.after_function)
after_func = after_func_attr(self.root_socket)
after_func(**job_orm.after_function_kwargs, session=session)

after_func_params = inspect.signature(after_func).parameters
add_after_kwargs = {}
if "session" in after_func_params:
add_after_kwargs["session"] = session
after_func(**job_orm.after_function_kwargs, **add_after_kwargs)

if job_orm.status == InternalJobStatusEnum.complete and job_orm.repeat_delay is not None:
self.add(
name=job_orm.name,
scheduled_date=now_at_utc() + timedelta(seconds=job_orm.repeat_delay),
function=job_orm.function,
kwargs=job_orm.kwargs,
user_id=job_orm.user_id,
unique_name=has_unique_name,
after_function=job_orm.after_function,
after_function_kwargs=job_orm.after_function_kwargs,
repeat_delay=job_orm.repeat_delay,
session=session,
)
session.commit()

def _wait_for_job(self, session: Session, logger, conn, end_event) -> Optional[InternalJobORM]:
@staticmethod
def _wait_for_job(session: Session, logger, conn, end_event):
"""
Blocks until a job is possibly available to run
"""
Expand Down
32 changes: 26 additions & 6 deletions qcfractal/qcfractal/components/internal_jobs/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import uuid
from typing import TYPE_CHECKING

import pytest

from qcfractal.components.internal_jobs.db_models import InternalJobORM
from qcfractal.components.internal_jobs.socket import InternalJobSocket
from qcportal.internal_jobs import InternalJobStatusEnum
Expand All @@ -16,19 +18,32 @@


# Add in another function to the internal_jobs socket for testing
def dummmy_internal_job(self, iterations: int, session, job_progress):
def dummy_internal_job(self, iterations: int, session, job_progress):
assert session is not None
assert job_progress is not None
for i in range(iterations):
time.sleep(1.0)
job_progress.update_progress(100 * ((i + 1) / iterations))
print("Dummy internal job counter ", i)
# print("Dummy internal job counter ", i)

if job_progress.cancelled():
return "Internal job cancelled"

return "Internal job finished"


setattr(InternalJobSocket, "dummy_job", dummmy_internal_job)
# Add in another function to the internal_jobs socket for testing
# This one doesn't have session or job_progress
def dummy_internal_job_2(self, iterations: int):
for i in range(iterations):
time.sleep(1.0)
# print("Dummy internal job counter ", i)

return "Internal job finished"


setattr(InternalJobSocket, "dummy_job", dummy_internal_job)
setattr(InternalJobSocket, "dummy_job_2", dummy_internal_job_2)


def test_internal_jobs_socket_add_unique(storage_socket: SQLAlchemySocket):
Expand Down Expand Up @@ -59,9 +74,10 @@ def test_internal_jobs_socket_add_non_unique(storage_socket: SQLAlchemySocket):
assert len({id_1, id_2, id_3}) == 3


def test_internal_jobs_socket_run(storage_socket: SQLAlchemySocket, session: Session):
@pytest.mark.parametrize("job_func", ("internal_jobs.dummy_job", "internal_jobs.dummy_job_2"))
def test_internal_jobs_socket_run(storage_socket: SQLAlchemySocket, session: Session, job_func: str):
id_1 = storage_socket.internal_jobs.add(
"dummy_job", now_at_utc(), "internal_jobs.dummy_job", {"iterations": 10}, None, unique_name=False
"dummy_job", now_at_utc(), job_func, {"iterations": 10}, None, unique_name=False
)

# Faster updates for testing
Expand All @@ -77,9 +93,13 @@ def test_internal_jobs_socket_run(storage_socket: SQLAlchemySocket, session: Ses
try:
job_1 = session.get(InternalJobORM, id_1)
assert job_1.status == InternalJobStatusEnum.running
assert job_1.progress > 10
assert time_0 < job_1.last_updated < time_1

if job_func == "internal_jobs.dummy_job":
assert job_1.progress > 10
else:
assert job_1.progress == 0

time.sleep(8)
time_2 = now_at_utc()

Expand Down
28 changes: 5 additions & 23 deletions qcfractal/qcfractal/components/managers/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
from typing import TYPE_CHECKING

from sqlalchemy import and_, update, select
from sqlalchemy.orm import selectinload, defer, undefer, lazyload, joinedload

from qcfractal.db_socket.helpers import get_query_proj_options, get_count, get_general
from qcportal.exceptions import MissingDataError, ComputeManagerError
from qcportal.exceptions import ComputeManagerError
from qcportal.managers import ManagerStatusEnum, ManagerName, ManagerQueryFilters
from qcportal.utils import now_at_utc
from .db_models import ComputeManagerORM

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session
from qcfractal.db_socket.socket import SQLAlchemySocket
from qcfractal.components.internal_jobs.status import JobProgress
from typing import List, Iterable, Optional, Sequence, Sequence, Dict, Any


Expand All @@ -32,31 +30,15 @@ def __init__(self, root_socket: SQLAlchemySocket):
self._manager_heartbeat_frequency = root_socket.qcf_config.heartbeat_frequency
self._manager_max_missed_heartbeats = root_socket.qcf_config.heartbeat_max_missed

# Add the initial job for checking on managers
self.add_internal_job_check_heartbeats(0.0)

def add_internal_job_check_heartbeats(self, delay: float, *, session: Optional[Session] = None):
"""
Adds an internal job to check for dead managers
Parameters
----------
delay
Schedule for this many seconds in the future
session
An existing SQLAlchemy session to use. If None, one will be created. If an existing session
is used, it will be flushed (but not committed) before returning from this function.
"""
with self.root_socket.optional_session(session) as session:
with self.root_socket.session_scope() as session:
self.root_socket.internal_jobs.add(
"check_manager_heartbeats",
now_at_utc() + timedelta(seconds=delay),
now_at_utc(),
"managers.check_manager_heartbeats",
{},
user_id=None,
unique_name=True,
after_function="managers.add_internal_job_check_heartbeats",
after_function_kwargs={"delay": self._manager_heartbeat_frequency},
repeat_delay=self._manager_heartbeat_frequency,
session=session,
)

Expand Down Expand Up @@ -298,7 +280,7 @@ def query(

return result_dicts

def check_manager_heartbeats(self, session: Session, job_progress: JobProgress) -> None:
def check_manager_heartbeats(self, session: Session) -> None:
"""
Checks for manager heartbeats
Expand Down
Loading

0 comments on commit c166787

Please sign in to comment.