diff --git a/EVENTQ_VERSION b/EVENTQ_VERSION index ee74734..6aba2b2 100644 --- a/EVENTQ_VERSION +++ b/EVENTQ_VERSION @@ -1 +1 @@ -4.1.0 +4.2.0 diff --git a/Gemfile b/Gemfile index 8619cf8..cfaf4e5 100644 --- a/Gemfile +++ b/Gemfile @@ -6,7 +6,7 @@ source 'https://rubygems.org' gemspec gem 'json', '~> 2' -gem 'redlock', '~> 1' +gem 'redis' platforms :ruby do gem 'bunny' diff --git a/docker-compose.yml b/docker-compose.yml index c67cfd3..48645bd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,7 +51,7 @@ services: localstack: # changes to multi region support after this version break tests - image: localstack/localstack:latest + image: localstack/localstack:stable container_name: localstack environment: - SQS_ENDPOINT_STRATEGY=off @@ -60,7 +60,7 @@ services: - "8085:8080" - "4566:4566" healthcheck: - test: if [ $$(curl -s -o /dev/null -w "%{http_code}" http://localhost:4566/health?reload) -ne 200 ]; then exit 1; fi + test: if [ $$(curl -s -o /dev/null -w "%{http_code}" http://localhost:4566/_localstack/health?reload) -ne 200 ]; then exit 1; fi interval: 10s timeout: 5s retries: 10 diff --git a/eventq.gemspec b/eventq.gemspec index f0750e3..bb85e21 100644 --- a/eventq.gemspec +++ b/eventq.gemspec @@ -32,5 +32,5 @@ Gem::Specification.new do |spec| spec.add_dependency 'concurrent-ruby' spec.add_dependency 'oj' spec.add_dependency 'openssl' - spec.add_dependency 'redlock' + spec.add_dependency 'connection_pool' end diff --git a/lib/eventq.rb b/lib/eventq.rb index 2cbfc4b..808724e 100644 --- a/lib/eventq.rb +++ b/lib/eventq.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require 'securerandom' -require 'redlock' require 'class_kit' require 'hash_kit' require 'oj' diff --git a/lib/eventq/eventq_base/nonce_manager.rb b/lib/eventq/eventq_base/nonce_manager.rb index 12c807c..33c9fd6 100644 --- a/lib/eventq/eventq_base/nonce_manager.rb +++ b/lib/eventq/eventq_base/nonce_manager.rb @@ -1,10 +1,24 @@ module EventQ + class NonceManagerNotConfiguredError < StandardError; end + class NonceManager - def self.configure(server:,timeout:10000,lifespan:3600) + def self.configure(server:,timeout:10000,lifespan:3600, pool_size: 5, pool_timeout: 5) @server_url = server @timeout = timeout @lifespan = lifespan + @pool_size = pool_size + @pool_timeout = pool_timeout + + @redis_pool = begin + require 'connection_pool' + require 'redis' + + ConnectionPool.new(size: @pool_size, timeout: @pool_timeout) do + Redis.new(url: @server_url) + end + end + @configured = true end def self.server_url @@ -19,39 +33,77 @@ def self.lifespan @lifespan end - def self.is_allowed?(nonce) - if @server_url == nil - return true + def self.pool_size + @pool_size + end + + def self.pool_timeout + @pool_timeout + end + + def self.lock(nonce) + # act as if successfully locked if not nonce manager configured - makes it a no-op + return true if !configured? + + successfully_locked = false + with_redis_connection do |conn| + successfully_locked = conn.set(nonce, 1, ex: lifespan, nx: true) end - require 'redlock' - lock = Redlock::Client.new([ @server_url ]).lock(nonce, @timeout) - if lock == false + if !successfully_locked EventQ.log(:info, "[#{self.class}] - Message has already been processed: #{nonce}") - return false end - return true + successfully_locked end + # if the message was successfully procesed, lock for another lifespan length + # so it isn't reprocessed def self.complete(nonce) - if @server_url != nil - Redis.new(url: @server_url).expire(nonce, @lifespan) + return true if !configured? + + with_redis_connection do |conn| + conn.expire(nonce, lifespan) end - return true + + true end + # if it failed, unlock immediately so that retries can kick in def self.failed(nonce) - if @server_url != nil - Redis.new(url: @server_url).del(nonce) + return true if !configured? + + with_redis_connection do |conn| + conn.del(nonce) end - return true + + true end def self.reset @server_url = nil @timeout = nil @lifespan = nil + @pool_size = nil + @pool_timeout = nil + @configured = false + @redis_pool.reload(&:close) + end + + def self.configured? + @configured == true + end + + private + + def self.with_redis_connection + if !configured? + raise NonceManagerNotConfiguredError, 'Unable to checkout redis connection from pool, nonce manager has not been configured. Call .configure on NonceManager.' + end + + @redis_pool.with do |conn| + yield conn + end end end end diff --git a/lib/eventq/queue_worker.rb b/lib/eventq/queue_worker.rb index a61b730..1f27f14 100644 --- a/lib/eventq/queue_worker.rb +++ b/lib/eventq/queue_worker.rb @@ -127,7 +127,7 @@ def process_message(block, message, retry_attempts, acceptance_args) EventQ.logger.debug("[#{self.class}] - Message received. Id: #{message.id}. Retry Attempts: #{retry_attempts}") - if (!EventQ::NonceManager.is_allowed?(message.id)) + if (!EventQ::NonceManager.lock(message.id)) EventQ.logger.warn("[#{self.class}] - Duplicate Message received. Id: #{message.id}. Ignoring message.") status = :duplicate return status, message_args diff --git a/spec/eventq_base/nonce_manager_spec.rb b/spec/eventq_base/nonce_manager_spec.rb index 02409b2..432eac4 100644 --- a/spec/eventq_base/nonce_manager_spec.rb +++ b/spec/eventq_base/nonce_manager_spec.rb @@ -6,13 +6,17 @@ let(:server_url) { 'redis://server:6379' } let(:timeout) { 5000 } let(:lifespan) { 20000} + let(:pool_size) { 1 } + let(:pool_timeout) { 1 } context 'when all values are specified' do it 'should set the configuration values correctly' do - described_class.configure(server: server_url, timeout: timeout, lifespan: lifespan) + described_class.configure(server: server_url, timeout: timeout, lifespan: lifespan, pool_size: pool_size, pool_timeout: pool_timeout) expect(described_class.server_url).to eq server_url expect(described_class.timeout).to eq timeout expect(described_class.lifespan).to eq lifespan + expect(described_class.pool_size).to eq pool_size + expect(described_class.pool_timeout).to eq pool_timeout end end @@ -22,6 +26,8 @@ expect(described_class.server_url).to eq server_url expect(described_class.timeout).to eq 10000 expect(described_class.lifespan).to eq 3600 + expect(described_class.pool_size).to eq 5 + expect(described_class.pool_timeout).to eq 5 end end @@ -31,7 +37,7 @@ end - describe '#is_allowed?' do + describe '#lock' do let(:nonce) { SecureRandom.uuid } context 'when NonceManager has been configured' do @@ -40,13 +46,13 @@ end context 'when a nonce has NOT been used' do it 'should return true' do - expect(described_class.is_allowed?(nonce)).to be true + expect(described_class.lock(nonce)).to be true end end context 'when a nonce has already been used' do it 'should return false' do - described_class.is_allowed?(nonce) - expect(described_class.is_allowed?(nonce)).to be false + described_class.lock(nonce) + expect(described_class.lock(nonce)).to be false end end after do @@ -60,15 +66,20 @@ end context 'when a nonce has NOT been used' do it 'should return true' do - expect(described_class.is_allowed?(nonce)).to be true + expect(described_class.lock(nonce)).to be true end end context 'when a nonce has already been used' do it 'should return false' do - described_class.is_allowed?(nonce) - expect(described_class.is_allowed?(nonce)).to be true + described_class.lock(nonce) + expect(described_class.lock(nonce)).to be true end end + + it 'should not attempt to hit redis' do + expect_any_instance_of(Redis).not_to receive(:set) + described_class.lock(nonce) + end end end @@ -78,7 +89,7 @@ context 'when NonceManager has been configured' do before do described_class.configure(server: ENV.fetch('REDIS_ENDPOINT', 'redis://redis:6379')) - described_class.is_allowed?(nonce) + described_class.lock(nonce) end it 'should extend the expiry of the nonce key' do expect(described_class.complete(nonce)).to eq true @@ -103,7 +114,7 @@ context 'when NonceManager has been configured' do before do described_class.configure(server: ENV.fetch('REDIS_ENDPOINT', 'redis://redis:6379')) - described_class.is_allowed?(nonce) + described_class.lock(nonce) end it 'should extend the expiry of the nonce key' do expect(described_class.failed(nonce)).to eq true @@ -123,4 +134,17 @@ end end + # this is a private method, but we want to make sure we correctly raise an error if a consumer tries to use + # it directly to access the underlying connection pool without configuring the nonce manager first + describe '.with_redis_connection' do + context 'when the nonce manager has not been configured' do + before do + described_class.reset + end + + it 'raises error' do + expect { described_class.with_redis_connection { } }.to raise_error(EventQ::NonceManagerNotConfiguredError) + end + end + end end