Skip to content
This repository has been archived by the owner on Jun 6, 2022. It is now read-only.

Periodic tasks are being deleted. #26

Open
minsis opened this issue Dec 2, 2020 · 3 comments
Open

Periodic tasks are being deleted. #26

minsis opened this issue Dec 2, 2020 · 3 comments

Comments

@minsis
Copy link

minsis commented Dec 2, 2020

I'm running celery beat and celery worker inside a docker container. When the container restarts it seems that all the periodic tasks are being deleted. I can't tell if this is an issue with this package or beat doing something weird.

The database entry doesn't have an expire datetime set.

Celery version: 4.4.7

Any ideas on where I could start to troubleshoot this? This is such an obscure issue I can't even tell how to look at this issue.

@minsis
Copy link
Author

minsis commented Dec 2, 2020

So over some extensive troubleshooting I've found that this isn't happening in postgresql 9.2. But when I run it in postgresql 12 or 13 this will happen where the entire celery_perodic_task table is cleared out and then re-initialized.

@minsis
Copy link
Author

minsis commented Dec 2, 2020

In the logs I can see that the inital read of the database shows there are empy rows, however this is not the case. My guess is that it thinks the rows are empty so then it rewrites the entire table with the hard coded celery built in task.

[2020-12-02 22:10:43,294: DEBUG/MainProcess] Setting default socket timeout to 30
[2020-12-02 22:10:43,294: INFO/MainProcess] beat: Starting...
[2020-12-02 22:10:43,295: INFO/MainProcess] setup_schedule
[2020-12-02 22:10:43,295: DEBUG/MainProcess] DatabaseScheduler: initial read
[2020-12-02 22:10:43,295: INFO/MainProcess] Writing entries...
[2020-12-02 22:10:43,295: DEBUG/MainProcess] DatabaseScheduler: Fetching database schedule
[2020-12-02 22:10:43,305: DEBUG/MainProcess] Current schedule:

[2020-12-02 22:10:43,325: DEBUG/MainProcess] schedule: <crontab: 0 4 * * * (m/h/d/dM/MY), UTC>
[2020-12-02 22:10:43,338: DEBUG/MainProcess] beat: Ticking with max interval->5.00 seconds
[2020-12-02 22:10:43,350: DEBUG/MainProcess] beat: Waking up in 5.00 seconds.
[2020-12-02 22:10:48,354: DEBUG/MainProcess] beat: Synchronizing schedule...
[2020-12-02 22:10:48,355: INFO/MainProcess] Writing entries...
[2020-12-02 22:10:48,361: DEBUG/MainProcess] beat: Waking up in 5.00 seconds.

@minsis
Copy link
Author

minsis commented Mar 25, 2021

I figured out my issue on this a while ago I guess I should comment here. There's a much larger explanation for how all this works, but the summary is this:

Basically if you're using SQLAlchemy for multiple connections then there ends up being some kinds of race conditions with how your app starts in some order. The connection ends up being shared out of memory as the same object. Since I use SQLAlchemy for other things than this library it all depended on how fast this library could start up. You can test this by putting a pause in the start up of this library.

The better way to do this is let sqlalchemy-scheduler do its thing and use scoped session throughout your app which gives you multiple connections in a pool and lets SQLAlchemy handle it all for you.

Looks something like this:

_db_engine = create_engine(
    connection_string,
    pool_pre_ping=SA_PRE_POOL_PING,
    pool_recycle=SA_PRE_POOL_RECYCLE,
    poolclass=NullPool
)

_scope = scoped_session(sessionmaker(bind=_db_engine))

session = _scope()

I wrote all this into a context manager that way I had the option of 'grabbing' a pool, doing my work, then closing the pooled session. If I needed something more and use a direct access I could just create the session object directly from the same object without needing the context manager.

# This gives me the ability to create a direct connection without the scope.
_db_engine_no_pool = create_engine(
    connection_string,
    poolclass=NullPool
)
_no_scope = sessionmaker(bind=_db_engine_no_pool)

class SessionManager:
    def __init__(self, scoped=True):
        self._scoped = scoped
        if scoped:
            self._session = _scope()
        else:
            self._session = _no_scope()

    def __enter__(self):
        return self._session

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            self._session.rollback()

        self._close_session()

    @property
    def db(self):
        return self._session

    def _close_session(self):
        if self._scoped:
            _scope.remove()
        else:
            self._session.close()

    def close(self):
        self._close_session()

Then here you can create a scoped session like:

with SessionManager() as sesion:
    # do some stuff
    pass

Or without a scoped session like:

session = SessionManager(scoped=False)

# do some stuff with session.db e.g. session.db.add(Object)

session.db.commit()
# then close your session
session.close()

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant