Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix: deadlock when stopping follower #927

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,71 @@ describe LavinMQ::Clustering::Client do
fail("election campaign did not finish in time, leadership not released on launcher stop?")
end
end

it "wont deadlock under high load when a follower disconnects [#926]" do
LavinMQ::Config.instance.clustering_max_unsynced_actions = 1
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
tcp_server = TCPServer.new("localhost", 0)
spawn(replicator.listen(tcp_server), name: "repli server spec")

client_io = TCPSocket.new("localhost", tcp_server.local_address.port)
client_io.write LavinMQ::Clustering::Start
client_io.write_bytes replicator.password.bytesize.to_u8, IO::ByteFormat::LittleEndian
client_io.write replicator.password.to_slice
client_io.read_byte
client_io.write_bytes 2i32, IO::ByteFormat::LittleEndian
client_io.flush
client_lz4 = Compress::LZ4::Reader.new(client_io)
# Two full syncs
sha1_size = Digest::SHA1.new.digest_size
2.times do
loop do
filename_len = client_lz4.read_bytes Int32, IO::ByteFormat::LittleEndian
break if filename_len.zero?
client_lz4.skip filename_len
client_lz4.skip sha1_size
end
client_io.write_bytes 0i32
client_io.flush
end

appended = Channel(Bool).new
spawn do
# Fill the action queue
loop do
replicator.append("path", 1)
appended.send true
rescue Channel::ClosedError
break
end
end

# Wait for the action queue to fill up
loop do
select
when appended.receive?
when timeout 0.1.seconds
# @action is a Channel. Let's look at its internal deque
action_queue = replicator.@followers.first.@actions.@queue.not_nil!("no deque? no follower?")
break if action_queue.size == action_queue.@capacity # full?
end
end

# Now disconnect the follower. Our "fill action queue" fiber should continue
client_io.close

select
when appended.receive?
when timeout 0.1.seconds
replicator.@followers.first.@actions.close
deadlock = true
end

appended.close
if deadlock
fail "deadlock detected"
end
ensure
replicator.try &.close
end
end
1 change: 1 addition & 0 deletions src/lavinmq/clustering/follower.cr
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ module LavinMQ
sync(sent_bytes)
end
ensure
@actions.close
@running.done
end

Expand Down
Loading