Skip to content

Commit

Permalink
add .seconds to timespans
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorerlingsson committed Jan 27, 2025
1 parent 397222e commit 18055c7
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions spec/stream_queue_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ describe LavinMQ::AMQP::StreamQueue do
with_amqp_server do |s|
StreamQueueSpecHelpers.publish(s, queue_name, offset + 1)
offset.times { StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag) }
sleep 0.1
sleep 0.1.seconds

# consume again, should start from last offset automatically
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
Expand All @@ -392,7 +392,7 @@ describe LavinMQ::AMQP::StreamQueue do
msg_store.store_consumer_offset(tag_prefix + i.to_s, offset)
end
msg_store.close
sleep 0.1
wait_for { msg_store.@closed }

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
offsets.each_with_index do |offset, i|
Expand Down Expand Up @@ -472,7 +472,7 @@ describe LavinMQ::AMQP::StreamQueue do
StreamQueueSpecHelpers.publish(s, queue_name, 2)
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1
sleep 0.1.seconds

# should consume the same message again since tracking was not saved from last consume
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
Expand All @@ -491,7 +491,7 @@ describe LavinMQ::AMQP::StreamQueue do
# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1
sleep 0.1.seconds

# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
Expand All @@ -510,7 +510,7 @@ describe LavinMQ::AMQP::StreamQueue do
# get message without x-stream-offset, tracks offset
msg = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
StreamQueueSpecHelpers.offset_from_headers(msg.properties.headers).should eq 1
sleep 0.1
sleep 0.1.seconds

# consume with x-stream-offset set, should consume the same message again
msg_2 = StreamQueueSpecHelpers.consume_one(s, queue_name, consumer_tag, c_args)
Expand All @@ -531,10 +531,10 @@ describe LavinMQ::AMQP::StreamQueue do
offsets.each_with_index do |offset, i|
msg_store.store_consumer_offset(tag_prefix + i.to_s, offset)
end
sleep 0.1
sleep 0.1.seconds
msg_store.cleanup_consumer_offsets
msg_store.close
sleep 0.1
sleep 0.1.seconds

msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(tag_prefix + 1.to_s).should eq nil
Expand Down Expand Up @@ -599,7 +599,7 @@ describe LavinMQ::AMQP::StreamQueue do
msgs.receive
end

sleep 0.1
sleep 0.1.seconds
data_dir = File.join(s.vhosts["/"].data_dir, Digest::SHA1.hexdigest queue_name)
msg_store = LavinMQ::AMQP::StreamQueue::StreamQueueMessageStore.new(data_dir, nil)
msg_store.last_offset_by_consumer_tag(c_tag).should eq nil
Expand Down

0 comments on commit 18055c7

Please sign in to comment.