diff --git a/.github/workflows/rspec.yml b/.github/workflows/rspec.yml index 8dc07a9..e7a0f14 100644 --- a/.github/workflows/rspec.yml +++ b/.github/workflows/rspec.yml @@ -30,12 +30,10 @@ jobs: --health-timeout 5s --health-retries 10 localstack: - # changes to multi region support after this version break tests - image: localstack/localstack:0.12.16 + image: localstack/localstack:latest env: - SERVICES: sqs,sns - HOSTNAME: localhost - HOSTNAME_EXTERNAL: localhost + SQS_ENDPOINT_STRATEGY: off + LOCALSTACK_HOST: localhost ports: - "8085:8080" - "4566:4566" diff --git a/docker-compose.yml b/docker-compose.yml index 0b44a74..c67cfd3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -51,12 +51,11 @@ services: localstack: # changes to multi region support after this version break tests - image: localstack/localstack:0.12.16 + image: localstack/localstack:latest container_name: localstack environment: - - SERVICES=sqs,sns - - HOSTNAME=localstack - - HOSTNAME_EXTERNAL=localstack + - SQS_ENDPOINT_STRATEGY=off + - LOCALSTACK_HOST=localstack ports: - "8085:8080" - "4566:4566" diff --git a/eventq.gemspec b/eventq.gemspec index a28b636..f0750e3 100644 --- a/eventq.gemspec +++ b/eventq.gemspec @@ -24,9 +24,9 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'shoulda-matchers' spec.add_development_dependency 'simplecov', '< 0.18.0' - spec.add_dependency 'aws-sdk-sqs', '~> 1.65.0' - spec.add_dependency 'aws-sdk-sns', '~> 1.68.0' - spec.add_dependency 'aws-sdk-core', '3.187.0' + spec.add_dependency 'aws-sdk-core' + spec.add_dependency 'aws-sdk-sns' + spec.add_dependency 'aws-sdk-sqs' spec.add_dependency 'bunny' spec.add_dependency 'class_kit' spec.add_dependency 'concurrent-ruby' diff --git a/spec/eventq_aws/integration/aws_queue_worker_spec.rb b/spec/eventq_aws/integration/aws_queue_worker_spec.rb index a29abec..c0f72c5 100644 --- a/spec/eventq_aws/integration/aws_queue_worker_spec.rb +++ b/spec/eventq_aws/integration/aws_queue_worker_spec.rb @@ -1,6 +1,9 @@ require 'spec_helper' RSpec.describe EventQ::Amazon::QueueWorker, integration: true do + include_context 'mock_aws_visibility_timeout' + include_context 'aws_wait_for_message_processed_helper' + let(:queue_worker) { EventQ::QueueWorker.new } let(:queue_client) do @@ -116,9 +119,6 @@ received = false context = nil - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, worker_adapter: subject, thread_count: 1, block_process: false, client: queue_client, wait: false) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) @@ -127,12 +127,12 @@ EventQ.logger.debug { "Message Received: #{event}" } end - sleep(2) + wait_for_message_processed - queue_worker.stop expect(received).to eq(true) expect(context).to eq message_context + queue_worker.stop expect(queue_worker.is_running).to eq(false) end @@ -143,9 +143,6 @@ received = false context = nil - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start( subscriber_queue, worker_adapter: subject, @@ -161,12 +158,12 @@ EventQ.logger.debug { "Message Received: #{event}" } end - sleep(2) + wait_for_message_processed - queue_worker.stop expect(received).to eq(true) expect(context).to eq message_context + queue_worker.stop expect(queue_worker.is_running).to eq(false) end @@ -185,9 +182,6 @@ received = false - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) @@ -195,12 +189,11 @@ EventQ.logger.debug { "Message Received: #{event}" } end - sleep(2) - - queue_worker.stop + wait_for_message_processed expect(received).to eq(true) + queue_worker.stop expect(queue_worker.is_running).to eq(false) end end @@ -216,9 +209,6 @@ received = false - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) @@ -226,12 +216,11 @@ EventQ.logger.debug { "Message Received: #{event}" } end - sleep(2) - - queue_worker.stop + wait_for_message_processed expect(received).to eq(true) + queue_worker.stop expect(queue_worker.is_running).to eq(false) end end @@ -248,9 +237,6 @@ received_count = 0 received_attribute = 0 - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) @@ -261,13 +247,13 @@ args.abort = true if received_count != 2 end - sleep(4) - - queue_worker.stop + 2.times { wait_for_message_processed } expect(received).to eq(true) expect(received_count).to eq(2) expect(received_attribute).to eq(1) + + queue_worker.stop expect(queue_worker.is_running).to eq(false) end @@ -296,7 +282,7 @@ end end - sleep(5) + 10.times { wait_for_message_processed } expect(message_count).to eq(10) expect(received_messages.length).to eq(5) @@ -307,7 +293,6 @@ expect(received_messages[4][:events]).to be >= 1 queue_worker.stop - expect(queue_worker.is_running).to eq(false) end @@ -333,11 +318,6 @@ end it 'should receive an event from the subscriber queue and retry it' do - retry_attempt_count = 0 - - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) @@ -345,21 +325,11 @@ raise 'Fail on purpose to send event to retry queue.' end - sleep(1) - - expect(retry_attempt_count).to eq(1) - - sleep(2) - - expect(retry_attempt_count).to eq(2) - - sleep(3) - - expect(retry_attempt_count).to eq(3) - - sleep(4) - - expect(retry_attempt_count).to eq(4) + expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 1) + expect(aws_visibility_timeout_queue.pop).to eq(call: 2, visibility_timeout: 2) + expect(aws_visibility_timeout_queue.pop).to eq(call: 3, visibility_timeout: 3) + expect(aws_visibility_timeout_queue.pop).to eq(call: 4, visibility_timeout: 4) + expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 5) queue_worker.stop @@ -367,37 +337,21 @@ end context 'queue.allow_exponential_back_off = true' do - let(:max_retry_delay) { 10_000 } + let(:max_retry_delay) { 20_000 } let(:allow_exponential_back_off) { true } it 'retries received event with an exponential waiting period' do - retry_attempt_count = 0 - - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 1, thread_count: 1, client: queue_client) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) - retry_attempt_count = args.retry_attempts + 1 raise 'Fail on purpose to send event to retry queue.' end - sleep(1) - - expect(retry_attempt_count).to eq(1) - - sleep(2) - - expect(retry_attempt_count).to eq(2) - - sleep(4) - - expect(retry_attempt_count).to eq(3) - - sleep(8) - - expect(retry_attempt_count).to eq(4) + expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 1) + expect(aws_visibility_timeout_queue.pop).to eq(call: 2, visibility_timeout: 2) + expect(aws_visibility_timeout_queue.pop).to eq(call: 3, visibility_timeout: 4) + expect(aws_visibility_timeout_queue.pop).to eq(call: 4, visibility_timeout: 8) + expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 16) queue_worker.stop @@ -416,21 +370,13 @@ end it 'retries after half the retry delay has passed' do - retry_attempt_count = 0 - - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, worker_adapter: subject, wait: false, block_process: false, sleep: 0.5, thread_count: 1, client: queue_client) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) - retry_attempt_count = args.retry_attempts + 1 raise 'Fail on purpose to send event to retry queue.' end - sleep(3) - - expect(retry_attempt_count).to eq(2) + expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 2) queue_worker.stop diff --git a/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb b/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb index 62af424..e7bd25d 100644 --- a/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb +++ b/spec/eventq_aws/integration/aws_queue_worker_v2_spec.rb @@ -1,6 +1,8 @@ require 'spec_helper' RSpec.describe EventQ::Amazon::QueueWorker, integration: true do +include_context 'mock_aws_visibility_timeout' +include_context 'aws_wait_for_message_processed_helper' let(:queue_worker) { EventQ::QueueWorker.new } @@ -38,9 +40,6 @@ received = false context = nil - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, client: queue_client }) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) @@ -49,13 +48,12 @@ EventQ.logger.debug { "Message Received: #{event}" } end - sleep(2) - - queue_worker.stop + wait_for_message_processed expect(received).to eq(true) expect(context).to eq message_context + queue_worker.stop expect(queue_worker.running?).to eq(false) end @@ -74,9 +72,6 @@ received = false - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, client: queue_client }) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) @@ -84,12 +79,11 @@ EventQ.logger.debug { "Message Received: #{event}" } end - sleep(2) - - queue_worker.stop + wait_for_message_processed expect(received).to eq(true) + queue_worker.stop expect(queue_worker.running?).to eq(false) end end @@ -105,9 +99,6 @@ received = false - #wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, client: queue_client }) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) @@ -115,19 +106,17 @@ EventQ.logger.debug { "Message Received: #{event}" } end - sleep(2) - - queue_worker.stop + wait_for_message_processed expect(received).to eq(true) + queue_worker.stop expect(queue_worker.running?).to eq(false) end end end it 'should receive an event from the subscriber queue and retry it (abort).' do - subscriber_queue.retry_delay = 1000 subscriber_queue.allow_retry = true @@ -136,10 +125,7 @@ received = false received_count = 0 - received_attribute = 0; - - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) + received_attribute = 0 queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, client: queue_client }) do |event, args| expect(event).to eq(message) @@ -153,13 +139,13 @@ end end - sleep(4) - - queue_worker.stop + 2.times { wait_for_message_processed } expect(received).to eq(true) expect(received_count).to eq(2) expect(received_attribute).to eq(1) + + queue_worker.stop expect(queue_worker.running?).to eq(false) end @@ -175,9 +161,6 @@ received_count = 0 received_attribute = 0; - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, client: queue_client }) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) @@ -190,13 +173,13 @@ end end - sleep(4) - - queue_worker.stop + 2.times { wait_for_message_processed } expect(received).to eq(true) expect(received_count).to eq(2) expect(received_attribute).to eq(1) + + queue_worker.stop expect(queue_worker.running?).to eq(false) end @@ -226,12 +209,11 @@ end end - sleep(10) + 10.times { wait_for_message_processed } expect(message_count).to eq(10) queue_worker.stop - expect(queue_worker.running?).to eq(false) end @@ -248,36 +230,19 @@ subscription_manager.subscribe(event_type, subscriber_queue) eventq_client.raise_event(event_type, message) - retry_attempt_count = 0 - - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, client: queue_client }) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) - retry_attempt_count = args.retry_attempts + 1 raise 'Fail on purpose to send event to retry queue.' end - sleep(1) - - expect(retry_attempt_count).to eq(1) - - sleep(2) - - expect(retry_attempt_count).to eq(2) - - sleep(3) - - expect(retry_attempt_count).to eq(3) - - sleep(4) - - expect(retry_attempt_count).to eq(4) + expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 1) + expect(aws_visibility_timeout_queue.pop).to eq(call: 2, visibility_timeout: 2) + expect(aws_visibility_timeout_queue.pop).to eq(call: 3, visibility_timeout: 3) + expect(aws_visibility_timeout_queue.pop).to eq(call: 4, visibility_timeout: 4) + expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 5) queue_worker.stop - expect(queue_worker.running?).to eq(false) end end @@ -297,48 +262,21 @@ subscription_manager.subscribe(event_type, subscriber_queue) eventq_client.raise_event(event_type, message) - retry_attempt_count = 0 - - # wait 1 second to allow the message to be sent and broadcast to the queue - sleep(1) - queue_worker.start(subscriber_queue, { worker_adapter: subject, wait: false, block_process: false, client: queue_client }) do |event, args| expect(event).to eq(message) expect(args).to be_a(EventQ::MessageArgs) - retry_attempt_count = args.retry_attempts + 1 raise 'Fail on purpose to send event to retry queue.' end - sleep(1) - - expect(retry_attempt_count).to eq(1) - - sleep(1) - - expect(retry_attempt_count).to eq(2) - - sleep(1) - - expect(retry_attempt_count).to eq(3) - - sleep(1) - - expect(retry_attempt_count).to eq(4) - - sleep(1) - - expect(retry_attempt_count).to eq(5) - - sleep(2) - - expect(retry_attempt_count).to eq(6) - - sleep(3) - - expect(retry_attempt_count).to eq(7) + expect(aws_visibility_timeout_queue.pop).to eq(call: 1, visibility_timeout: 1) + expect(aws_visibility_timeout_queue.pop).to eq(call: 2, visibility_timeout: 1) + expect(aws_visibility_timeout_queue.pop).to eq(call: 3, visibility_timeout: 1) + expect(aws_visibility_timeout_queue.pop).to eq(call: 4, visibility_timeout: 1) + expect(aws_visibility_timeout_queue.pop).to eq(call: 5, visibility_timeout: 1) + expect(aws_visibility_timeout_queue.pop).to eq(call: 6, visibility_timeout: 2) + expect(aws_visibility_timeout_queue.pop).to eq(call: 7, visibility_timeout: 3) queue_worker.stop - expect(queue_worker.running?).to eq(false) end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index c958e97..0e257bf 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -20,10 +20,13 @@ require_relative '../lib/eventq/rabbitmq' require_relative '../lib/eventq/aws' -RSpec.configure do |config| +require_relative 'support' - config.before(:each) do +RSpec.configure do |config| + config.before(:all) do + # Set logging level to FATAL to prevent showing retry ERROR logs etc during test runs + EventQ.logger.level = Logger::FATAL end config.expect_with :rspec do |expectations| diff --git a/spec/support.rb b/spec/support.rb new file mode 100644 index 0000000..43b443a --- /dev/null +++ b/spec/support.rb @@ -0,0 +1 @@ +require_relative './support/shared_aws_context' diff --git a/spec/support/shared_aws_context.rb b/spec/support/shared_aws_context.rb new file mode 100644 index 0000000..b56a937 --- /dev/null +++ b/spec/support/shared_aws_context.rb @@ -0,0 +1,39 @@ +RSpec.shared_context 'aws_wait_for_message_processed_helper' do + let(:aws_message_processed_queue) { Queue.new } + + # Ensure spec breaks after 10 seconds to prevent waiting for next result forever + around do |example| + Timeout::timeout(10) { example.run } + end + + # The method `untag_processing_thread` is called after each time a message is processed + before do + allow(subject).to receive(:untag_processing_thread).and_wrap_original do |original_method, *args, &block| + aws_message_processed_queue.push(true) + original_method.call(*args, &block) + end + end + + # Waits for result on the queue and time out after 10 seconds due to the around block + def wait_for_message_processed + aws_message_processed_queue.pop + end +end + +RSpec.shared_context 'mock_aws_visibility_timeout' do + let(:aws_visibility_timeout_queue) { Queue.new } + + # Ensure spec breaks after 5 seconds to prevent waiting for next result forever + around do |example| + Timeout::timeout(5) { example.run } + end + + before do + # Store visibility timeout for checking expectations, but return timeout of zero to speed up specs + allow(subject).to receive(:calculate_visibility_timeout).and_wrap_original do |original_method, *args, &block| + result = original_method.call(*args, &block) + aws_visibility_timeout_queue.push(call: args.first, visibility_timeout: result) + 0 + end + end +end