Skip to content

Commit

Permalink
realtime option
Browse files Browse the repository at this point in the history
  • Loading branch information
smrchy committed Jul 6, 2018
1 parent c80e42a commit 3e1128f
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 79 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ node_js:
- "4"
- "6"
- "8"
- "10"
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# CHANGELOG for rsmq

## 0.9.0

* Added realtime option for Redis PUB/SUB when a new message is sent to RSMQ
* Added tests for realtime
* Added Travis tests for Node 10
* Updated dependencies (lodash and dev.async)

## 0.8.4

* Added Typescript typings
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ Parameters for RedisSMQ via an *options* object:

* `host` (String): *optional (Default: "127.0.0.1")* The Redis server
* `port` (Number): *optional (Default: 6379)* The Redis port
* `options` (Object): *optional (Default: {})* The Redis [https://github.com/NodeRedis/node_redis#options-object-properties](https://github.com/NodeRedis/node_redis#options-object-properties) `options` object.
* `options` (Object): *optional (Default: {})* The [Redis options](https://github.com/NodeRedis/node_redis#options-object-properties) object.
* `client` (RedisClient): *optional* A existing redis client instance. `host` and `server` will be ignored.
* `ns` (String): *optional (Default: "rsmq")* The namespace prefix used for all keys created by RSMQ
* `realtime` (Boolean): *optional (Default: false)* Enable realtime PUBLISH of new messages (see the [Realtime section](#realtime))


### Create a queue
Expand Down Expand Up @@ -328,6 +329,20 @@ Returns an object:
Disconnect the redis client.
This is only useful if you are using rsmq within a script and want node to be able to exit.

## Realtime

When [initializing](#initialize) RSMQ you can enable the realtime PUBLISH for new messages. On every new message that gets sent to RSQM via `sendMessage` a Redis PUBLISH will be issued to `{{rsmq.ns}:rt:{qname}.

Example for RSMQ with default settings:

* The queue `testQueue` already contains 5 messages.
* A new message is being sent to the queue `testQueue`.
* The following Redis command will be issued: `PUBLISH rsmq:rt:testQueue 6`

### How to use the realtime option

Besides the PUBLISH when a new message is sent to RSMQ nothing else will happen. Your app could use the Redis SUBSCRIBE command to be notified of new messages and issue a `receiveMessage` then. However make sure not to listen with multiple workers for new messages with SUBSCRIBE to prevent multiple simultaneous `receiveMessage` calls.

## Changes

see the [CHANGELOG](https://github.com/smrchy/rsmq/blob/master/CHANGELOG.md)
Expand Down
21 changes: 14 additions & 7 deletions index.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ A Really Simple Message Queue based on Redis
The MIT License (MIT)
Copyright © 2013-2016 Patrick Liess, http://www.tcs.de
Copyright © 2013-2018 Patrick Liess, http://www.tcs.de
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
Expand All @@ -22,16 +22,17 @@ EventEmitter = require( "events" ).EventEmitter

# To create a new instance use:
#
# RedisSMQ = require("redis-simple-message-queue")
# RedisSMQ = require("rsmq")
# rsmq = new RedisSMQ()
#
# Paramenters for RedisSMQ:
#
# * `host` (String): *optional (Default: "127.0.0.1")* The Redis server
# * `port` (Number): *optional (Default: 6379)* The Redis port
# * `options` (Object): *optional (Default: {})* The Redis options object.
# * `client` (RedisClient): *optional* A existing redis client instance. `host` and `server` will be ignored.
# * `client` (RedisClient): *optional* A existing redis client instance. `host` and `server` will be ignored.
# * `ns` (String): *optional (Default: "rsmq")* The namespace prefix used for all keys created by **rsmq**
# * `realtime` (Boolean): *optional (Default: false)* Enable realtime PUBLISH of new messages
##
class RedisSMQ extends EventEmitter

Expand All @@ -43,8 +44,9 @@ class RedisSMQ extends EventEmitter
options: {}
client: null
ns: "rsmq"
realtime: false
, options

@realtime = opts.realtime
@redisns = opts.ns + ":"
if opts.client?.constructor?.name is "RedisClient"
@redis = opts.client
Expand Down Expand Up @@ -487,16 +489,21 @@ class RedisSMQ extends EventEmitter
return

# Ready to store the message
key = "#{@redisns}#{options.qname}"
mc = [
["zadd", "#{@redisns}#{options.qname}", q.ts + options.delay * 1000, q.uid]
["hset", "#{@redisns}#{options.qname}:Q", q.uid, options.message]
["hincrby", "#{@redisns}#{options.qname}:Q", "totalsent", 1]
["zadd", key, q.ts + options.delay * 1000, q.uid]
["hset", "#{key}:Q", q.uid, options.message]
["hincrby", "#{key}:Q", "totalsent", 1]
]
if @realtime
mc.push(["zcard", key])

@redis.multi(mc).exec (err, resp) =>
if err
@_handleError(cb, err)
return
if @realtime
@redis.publish("#{@redisns}rt:#{options.qname}", resp[3])
cb(null, q.uid)
return
return
Expand Down
17 changes: 13 additions & 4 deletions index.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 41 additions & 35 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
},
"dependencies": {
"@types/redis": "^2.8.0",
"lodash": "^4.17.4",
"lodash": "^4.17.10",
"redis": "^2.8.0"
},
"optionalDependencies": {
"hiredis": "^0.5.0"
},
"devDependencies": {
"async": "^2.5.0",
"async": "^2.6.1",
"coffee-script": "^1.12.7",
"mocha": "^4.0.1",
"should": "^13.1.3"
Expand Down
Loading

0 comments on commit 3e1128f

Please sign in to comment.