-
Notifications
You must be signed in to change notification settings - Fork 62
Periodic tasks are being deleted. #26
Comments
So over some extensive troubleshooting I've found that this isn't happening in postgresql 9.2. But when I run it in postgresql 12 or 13 this will happen where the entire celery_perodic_task table is cleared out and then re-initialized. |
In the logs I can see that the inital read of the database shows there are empy rows, however this is not the case. My guess is that it thinks the rows are empty so then it rewrites the entire table with the hard coded celery built in task.
|
I figured out my issue on this a while ago I guess I should comment here. There's a much larger explanation for how all this works, but the summary is this: Basically if you're using SQLAlchemy for multiple connections then there ends up being some kinds of race conditions with how your app starts in some order. The connection ends up being shared out of memory as the same object. Since I use SQLAlchemy for other things than this library it all depended on how fast this library could start up. You can test this by putting a pause in the start up of this library. The better way to do this is let sqlalchemy-scheduler do its thing and use scoped session throughout your app which gives you multiple connections in a pool and lets SQLAlchemy handle it all for you. Looks something like this: _db_engine = create_engine(
connection_string,
pool_pre_ping=SA_PRE_POOL_PING,
pool_recycle=SA_PRE_POOL_RECYCLE,
poolclass=NullPool
)
_scope = scoped_session(sessionmaker(bind=_db_engine))
session = _scope() I wrote all this into a context manager that way I had the option of 'grabbing' a pool, doing my work, then closing the pooled session. If I needed something more and use a direct access I could just create the session object directly from the same object without needing the context manager. # This gives me the ability to create a direct connection without the scope.
_db_engine_no_pool = create_engine(
connection_string,
poolclass=NullPool
)
_no_scope = sessionmaker(bind=_db_engine_no_pool)
class SessionManager:
def __init__(self, scoped=True):
self._scoped = scoped
if scoped:
self._session = _scope()
else:
self._session = _no_scope()
def __enter__(self):
return self._session
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self._session.rollback()
self._close_session()
@property
def db(self):
return self._session
def _close_session(self):
if self._scoped:
_scope.remove()
else:
self._session.close()
def close(self):
self._close_session() Then here you can create a scoped session like: with SessionManager() as sesion:
# do some stuff
pass Or without a scoped session like: session = SessionManager(scoped=False)
# do some stuff with session.db e.g. session.db.add(Object)
session.db.commit()
# then close your session
session.close() |
I'm running celery beat and celery worker inside a docker container. When the container restarts it seems that all the periodic tasks are being deleted. I can't tell if this is an issue with this package or beat doing something weird.
The database entry doesn't have an expire datetime set.
Celery version: 4.4.7
Any ideas on where I could start to troubleshoot this? This is such an obscure issue I can't even tell how to look at this issue.
The text was updated successfully, but these errors were encountered: