-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathFIFORedisQueue.py
34 lines (28 loc) · 1021 Bytes
/
FIFORedisQueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import redis
import pickle
from time import sleep
from os import getpid
class Queue(object):
"""docstring for Queue"""
def __init__(self, maxsize, name='default_name'):
self.maxsize = maxsize
self.name = name
self.conn = redis.StrictRedis(host='localhost', port=6379, db=0)
self.conn.flushall()
def get(self, block=True, timeout=0):
data = self.conn.blpop(self.name)
return pickle.loads(data[1])
def put(self, data, block=True, timeout=0):
# Wait for get operations to decrease the queue size
# There appears to be a race condition here. Two processes might read the queue size as 99, and then both push.
# Ideally there should be some sort of mutex here to prevent that from happening
while self.qsize() >= self.maxsize:
print(getpid(), "Waiting for queue to free up")
sleep(1)
self.conn.rpush(self.name, pickle.dumps(data))
def qsize(self):
return self.conn.llen(self.name)
def empty(self):
return (self.qsize() == 0)
def full(self):
return (self.qsize() == self.maxsize)