From 7c14af3667a72e6ccce4f195e182bc240cee196c Mon Sep 17 00:00:00 2001 From: Lyle Davis Date: Fri, 17 Jan 2025 16:10:37 +0000 Subject: [PATCH 1/8] fix connection pooling for nonce manager --- docker-compose.yml | 4 +- eventq.gemspec | 2 + lib/eventq/eventq_base/nonce_manager.rb | 73 ++++++++++++++++++++----- lib/eventq/queue_worker.rb | 2 +- spec/eventq_base/nonce_manager_spec.rb | 31 +++++++---- 5 files changed, 84 insertions(+), 28 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index c67cfd3..48645bd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,7 +51,7 @@ services: localstack: # changes to multi region support after this version break tests - image: localstack/localstack:latest + image: localstack/localstack:stable container_name: localstack environment: - SQS_ENDPOINT_STRATEGY=off @@ -60,7 +60,7 @@ services: - "8085:8080" - "4566:4566" healthcheck: - test: if [ $$(curl -s -o /dev/null -w "%{http_code}" http://localhost:4566/health?reload) -ne 200 ]; then exit 1; fi + test: if [ $$(curl -s -o /dev/null -w "%{http_code}" http://localhost:4566/_localstack/health?reload) -ne 200 ]; then exit 1; fi interval: 10s timeout: 5s retries: 10 diff --git a/eventq.gemspec b/eventq.gemspec index f0750e3..9175746 100644 --- a/eventq.gemspec +++ b/eventq.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rspec' spec.add_development_dependency 'shoulda-matchers' spec.add_development_dependency 'simplecov', '< 0.18.0' + spec.add_development_dependency 'debug' spec.add_dependency 'aws-sdk-core' spec.add_dependency 'aws-sdk-sns' @@ -33,4 +34,5 @@ Gem::Specification.new do |spec| spec.add_dependency 'oj' spec.add_dependency 'openssl' spec.add_dependency 'redlock' + spec.add_dependency 'connection_pool' end diff --git a/lib/eventq/eventq_base/nonce_manager.rb b/lib/eventq/eventq_base/nonce_manager.rb index 12c807c..7c9b0e9 100644 --- a/lib/eventq/eventq_base/nonce_manager.rb +++ b/lib/eventq/eventq_base/nonce_manager.rb @@ -1,10 +1,12 @@ module EventQ class NonceManager - def self.configure(server:,timeout:10000,lifespan:3600) + def self.configure(server:,timeout:10000,lifespan:3600, pool_size: 5, pool_timeout: 5) @server_url = server @timeout = timeout @lifespan = lifespan + @pool_size = pool_size + @pool_timeout = pool_timeout end def self.server_url @@ -19,33 +21,51 @@ def self.lifespan @lifespan end - def self.is_allowed?(nonce) - if @server_url == nil - return true + def self.pool_size + @pool_size + end + + def self.pool_timeout + @pool_timeout + end + + def self.lock(nonce) + # act as if successfully locked if not nonce manager configured - makes it a no-op + return true if not_configured? + + successfully_locked = false + with_redis_connection do |conn| + successfully_locked = conn.set(nonce, 1, ex: lifespan, nx: true) end - require 'redlock' - lock = Redlock::Client.new([ @server_url ]).lock(nonce, @timeout) - if lock == false + if !successfully_locked EventQ.log(:info, "[#{self.class}] - Message has already been processed: #{nonce}") - return false end - return true + successfully_locked end + # if the message was successfully procesed, lock for another lifespan length + # so it isn't reprocessed def self.complete(nonce) - if @server_url != nil - Redis.new(url: @server_url).expire(nonce, @lifespan) + return true if not_configured? + + with_redis_connection do |conn| + conn.expire(nonce, lifespan) end - return true + + true end + # if it failed, unlock immediately so that retries can kick in def self.failed(nonce) - if @server_url != nil - Redis.new(url: @server_url).del(nonce) + return true if not_configured? + + with_redis_connection do |conn| + conn.del(nonce) end - return true + + true end def self.reset @@ -53,5 +73,28 @@ def self.reset @timeout = nil @lifespan = nil end + + private + + def self.with_redis_connection + redis_pool.with do |conn| + yield conn + end + end + + def self.redis_pool + @redis_pool ||= begin + require 'connection_pool' + require 'redis' + + ConnectionPool.new(size: @pool_size, timeout: @pool_timeout) do + Redis.new(url: @server_url) + end + end + end + + def self.not_configured? + @server_url.nil? || @server_url.empty? + end end end diff --git a/lib/eventq/queue_worker.rb b/lib/eventq/queue_worker.rb index a61b730..1f27f14 100644 --- a/lib/eventq/queue_worker.rb +++ b/lib/eventq/queue_worker.rb @@ -127,7 +127,7 @@ def process_message(block, message, retry_attempts, acceptance_args) EventQ.logger.debug("[#{self.class}] - Message received. Id: #{message.id}. Retry Attempts: #{retry_attempts}") - if (!EventQ::NonceManager.is_allowed?(message.id)) + if (!EventQ::NonceManager.lock(message.id)) EventQ.logger.warn("[#{self.class}] - Duplicate Message received. Id: #{message.id}. Ignoring message.") status = :duplicate return status, message_args diff --git a/spec/eventq_base/nonce_manager_spec.rb b/spec/eventq_base/nonce_manager_spec.rb index 02409b2..17fdc5d 100644 --- a/spec/eventq_base/nonce_manager_spec.rb +++ b/spec/eventq_base/nonce_manager_spec.rb @@ -6,13 +6,17 @@ let(:server_url) { 'redis://server:6379' } let(:timeout) { 5000 } let(:lifespan) { 20000} + let(:pool_size) { 1 } + let(:pool_timeout) { 1 } context 'when all values are specified' do it 'should set the configuration values correctly' do - described_class.configure(server: server_url, timeout: timeout, lifespan: lifespan) + described_class.configure(server: server_url, timeout: timeout, lifespan: lifespan, pool_size: pool_size, pool_timeout: pool_timeout) expect(described_class.server_url).to eq server_url expect(described_class.timeout).to eq timeout expect(described_class.lifespan).to eq lifespan + expect(described_class.pool_size).to eq pool_size + expect(described_class.pool_timeout).to eq pool_timeout end end @@ -22,6 +26,8 @@ expect(described_class.server_url).to eq server_url expect(described_class.timeout).to eq 10000 expect(described_class.lifespan).to eq 3600 + expect(described_class.pool_size).to eq 5 + expect(described_class.pool_timeout).to eq 5 end end @@ -31,7 +37,7 @@ end - describe '#is_allowed?' do + describe '#lock' do let(:nonce) { SecureRandom.uuid } context 'when NonceManager has been configured' do @@ -40,13 +46,13 @@ end context 'when a nonce has NOT been used' do it 'should return true' do - expect(described_class.is_allowed?(nonce)).to be true + expect(described_class.lock(nonce)).to be true end end context 'when a nonce has already been used' do it 'should return false' do - described_class.is_allowed?(nonce) - expect(described_class.is_allowed?(nonce)).to be false + described_class.lock(nonce) + expect(described_class.lock(nonce)).to be false end end after do @@ -60,15 +66,20 @@ end context 'when a nonce has NOT been used' do it 'should return true' do - expect(described_class.is_allowed?(nonce)).to be true + expect(described_class.lock(nonce)).to be true end end context 'when a nonce has already been used' do it 'should return false' do - described_class.is_allowed?(nonce) - expect(described_class.is_allowed?(nonce)).to be true + described_class.lock(nonce) + expect(described_class.lock(nonce)).to be true end end + + it 'should not attempt to hit redis' do + expect_any_instance_of(Redis).not_to receive(:set) + described_class.lock(nonce) + end end end @@ -78,7 +89,7 @@ context 'when NonceManager has been configured' do before do described_class.configure(server: ENV.fetch('REDIS_ENDPOINT', 'redis://redis:6379')) - described_class.is_allowed?(nonce) + described_class.lock(nonce) end it 'should extend the expiry of the nonce key' do expect(described_class.complete(nonce)).to eq true @@ -103,7 +114,7 @@ context 'when NonceManager has been configured' do before do described_class.configure(server: ENV.fetch('REDIS_ENDPOINT', 'redis://redis:6379')) - described_class.is_allowed?(nonce) + described_class.lock(nonce) end it 'should extend the expiry of the nonce key' do expect(described_class.failed(nonce)).to eq true From 2b80d9059dab5ebf742c05bdca0f6c7f5748cc28 Mon Sep 17 00:00:00 2001 From: Lyle Davis Date: Fri, 17 Jan 2025 16:19:35 +0000 Subject: [PATCH 2/8] remove redlock dependency --- Gemfile | 1 - eventq.gemspec | 1 - lib/eventq.rb | 1 - 3 files changed, 3 deletions(-) diff --git a/Gemfile b/Gemfile index 8619cf8..bb46d0e 100644 --- a/Gemfile +++ b/Gemfile @@ -6,7 +6,6 @@ source 'https://rubygems.org' gemspec gem 'json', '~> 2' -gem 'redlock', '~> 1' platforms :ruby do gem 'bunny' diff --git a/eventq.gemspec b/eventq.gemspec index 9175746..df2ab84 100644 --- a/eventq.gemspec +++ b/eventq.gemspec @@ -33,6 +33,5 @@ Gem::Specification.new do |spec| spec.add_dependency 'concurrent-ruby' spec.add_dependency 'oj' spec.add_dependency 'openssl' - spec.add_dependency 'redlock' spec.add_dependency 'connection_pool' end diff --git a/lib/eventq.rb b/lib/eventq.rb index 2cbfc4b..808724e 100644 --- a/lib/eventq.rb +++ b/lib/eventq.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require 'securerandom' -require 'redlock' require 'class_kit' require 'hash_kit' require 'oj' From 9759c177e11c91d514f69f5bc75a7deac1df4879 Mon Sep 17 00:00:00 2001 From: Lyle Davis Date: Fri, 17 Jan 2025 16:29:34 +0000 Subject: [PATCH 3/8] add pool variables to reset --- lib/eventq/eventq_base/nonce_manager.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/eventq/eventq_base/nonce_manager.rb b/lib/eventq/eventq_base/nonce_manager.rb index 7c9b0e9..7779d8c 100644 --- a/lib/eventq/eventq_base/nonce_manager.rb +++ b/lib/eventq/eventq_base/nonce_manager.rb @@ -72,6 +72,8 @@ def self.reset @server_url = nil @timeout = nil @lifespan = nil + @pool_size = nil + @pool_timeout = nil end private From 158a4e2b78d1a9a4f6d68e31817537f2d5ba1bf5 Mon Sep 17 00:00:00 2001 From: Lyle Davis Date: Fri, 17 Jan 2025 16:46:36 +0000 Subject: [PATCH 4/8] add redis dependency --- eventq.gemspec | 1 + 1 file changed, 1 insertion(+) diff --git a/eventq.gemspec b/eventq.gemspec index df2ab84..1ca33cf 100644 --- a/eventq.gemspec +++ b/eventq.gemspec @@ -34,4 +34,5 @@ Gem::Specification.new do |spec| spec.add_dependency 'oj' spec.add_dependency 'openssl' spec.add_dependency 'connection_pool' + spec.add_dependency 'redis' end From 47fd318fecb5c6a41c79fbaa10e4b1b141b82bae Mon Sep 17 00:00:00 2001 From: Lyle Davis Date: Fri, 17 Jan 2025 18:28:24 +0000 Subject: [PATCH 5/8] move redis dependency into Gemfile - it's only a dependency when using the nonce manager --- Gemfile | 1 + eventq.gemspec | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/Gemfile b/Gemfile index bb46d0e..cfaf4e5 100644 --- a/Gemfile +++ b/Gemfile @@ -6,6 +6,7 @@ source 'https://rubygems.org' gemspec gem 'json', '~> 2' +gem 'redis' platforms :ruby do gem 'bunny' diff --git a/eventq.gemspec b/eventq.gemspec index 1ca33cf..df2ab84 100644 --- a/eventq.gemspec +++ b/eventq.gemspec @@ -34,5 +34,4 @@ Gem::Specification.new do |spec| spec.add_dependency 'oj' spec.add_dependency 'openssl' spec.add_dependency 'connection_pool' - spec.add_dependency 'redis' end From 782e526097693c6cf2d965da227b856ccca84afe Mon Sep 17 00:00:00 2001 From: Lyle Davis Date: Mon, 20 Jan 2025 11:10:41 +0000 Subject: [PATCH 6/8] remove debug dev-dependency --- eventq.gemspec | 1 - 1 file changed, 1 deletion(-) diff --git a/eventq.gemspec b/eventq.gemspec index df2ab84..bb85e21 100644 --- a/eventq.gemspec +++ b/eventq.gemspec @@ -23,7 +23,6 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rspec' spec.add_development_dependency 'shoulda-matchers' spec.add_development_dependency 'simplecov', '< 0.18.0' - spec.add_development_dependency 'debug' spec.add_dependency 'aws-sdk-core' spec.add_dependency 'aws-sdk-sns' From 00600074bbf587a8883d4ab8aea966f417fe78a9 Mon Sep 17 00:00:00 2001 From: Lyle Davis Date: Mon, 20 Jan 2025 12:09:11 +0000 Subject: [PATCH 7/8] move redis pool setup into the constructor --- lib/eventq/eventq_base/nonce_manager.rb | 43 ++++++++++++++----------- spec/eventq_base/nonce_manager_spec.rb | 13 ++++++++ 2 files changed, 38 insertions(+), 18 deletions(-) diff --git a/lib/eventq/eventq_base/nonce_manager.rb b/lib/eventq/eventq_base/nonce_manager.rb index 7779d8c..33c9fd6 100644 --- a/lib/eventq/eventq_base/nonce_manager.rb +++ b/lib/eventq/eventq_base/nonce_manager.rb @@ -1,4 +1,6 @@ module EventQ + class NonceManagerNotConfiguredError < StandardError; end + class NonceManager def self.configure(server:,timeout:10000,lifespan:3600, pool_size: 5, pool_timeout: 5) @@ -7,6 +9,16 @@ def self.configure(server:,timeout:10000,lifespan:3600, pool_size: 5, pool_timeo @lifespan = lifespan @pool_size = pool_size @pool_timeout = pool_timeout + + @redis_pool = begin + require 'connection_pool' + require 'redis' + + ConnectionPool.new(size: @pool_size, timeout: @pool_timeout) do + Redis.new(url: @server_url) + end + end + @configured = true end def self.server_url @@ -31,7 +43,7 @@ def self.pool_timeout def self.lock(nonce) # act as if successfully locked if not nonce manager configured - makes it a no-op - return true if not_configured? + return true if !configured? successfully_locked = false with_redis_connection do |conn| @@ -48,7 +60,7 @@ def self.lock(nonce) # if the message was successfully procesed, lock for another lifespan length # so it isn't reprocessed def self.complete(nonce) - return true if not_configured? + return true if !configured? with_redis_connection do |conn| conn.expire(nonce, lifespan) @@ -59,7 +71,7 @@ def self.complete(nonce) # if it failed, unlock immediately so that retries can kick in def self.failed(nonce) - return true if not_configured? + return true if !configured? with_redis_connection do |conn| conn.del(nonce) @@ -74,29 +86,24 @@ def self.reset @lifespan = nil @pool_size = nil @pool_timeout = nil + @configured = false + @redis_pool.reload(&:close) + end + + def self.configured? + @configured == true end private def self.with_redis_connection - redis_pool.with do |conn| - yield conn + if !configured? + raise NonceManagerNotConfiguredError, 'Unable to checkout redis connection from pool, nonce manager has not been configured. Call .configure on NonceManager.' end - end - - def self.redis_pool - @redis_pool ||= begin - require 'connection_pool' - require 'redis' - ConnectionPool.new(size: @pool_size, timeout: @pool_timeout) do - Redis.new(url: @server_url) - end + @redis_pool.with do |conn| + yield conn end end - - def self.not_configured? - @server_url.nil? || @server_url.empty? - end end end diff --git a/spec/eventq_base/nonce_manager_spec.rb b/spec/eventq_base/nonce_manager_spec.rb index 17fdc5d..432eac4 100644 --- a/spec/eventq_base/nonce_manager_spec.rb +++ b/spec/eventq_base/nonce_manager_spec.rb @@ -134,4 +134,17 @@ end end + # this is a private method, but we want to make sure we correctly raise an error if a consumer tries to use + # it directly to access the underlying connection pool without configuring the nonce manager first + describe '.with_redis_connection' do + context 'when the nonce manager has not been configured' do + before do + described_class.reset + end + + it 'raises error' do + expect { described_class.with_redis_connection { } }.to raise_error(EventQ::NonceManagerNotConfiguredError) + end + end + end end From 12a3bf7ab234e80341573a6fe541c5e70f5852b1 Mon Sep 17 00:00:00 2001 From: Lyle Davis Date: Mon, 20 Jan 2025 13:35:17 +0000 Subject: [PATCH 8/8] bump eventq version --- EVENTQ_VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/EVENTQ_VERSION b/EVENTQ_VERSION index ee74734..6aba2b2 100644 --- a/EVENTQ_VERSION +++ b/EVENTQ_VERSION @@ -1 +1 @@ -4.1.0 +4.2.0