diff --git a/src/lavinmq/amqp/exchange/consistent_hash.cr b/src/lavinmq/amqp/exchange/consistent_hash.cr index 8c856664d0..eb27c4a8b7 100644 --- a/src/lavinmq/amqp/exchange/consistent_hash.cr +++ b/src/lavinmq/amqp/exchange/consistent_hash.cr @@ -43,12 +43,10 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) + def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) key = hash_key(routing_key, headers) if d = @hasher.get(key) - {d}.each - else - Iterator(Destination).empty + yield d end end diff --git a/src/lavinmq/amqp/exchange/default.cr b/src/lavinmq/amqp/exchange/default.cr index b39600d6bd..81936b6005 100644 --- a/src/lavinmq/amqp/exchange/default.cr +++ b/src/lavinmq/amqp/exchange/default.cr @@ -11,11 +11,9 @@ module LavinMQ Iterator(BindingDetails).empty end - protected def bindings(routing_key, headers) : Iterator(LavinMQ::Destination) + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) if q = @vhost.queues[routing_key]? - Tuple(LavinMQ::Destination).new(q).each - else - Iterator(LavinMQ::Destination).empty + yield q end end diff --git a/src/lavinmq/amqp/exchange/direct.cr b/src/lavinmq/amqp/exchange/direct.cr index 86c93a12d4..c00b043e16 100644 --- a/src/lavinmq/amqp/exchange/direct.cr +++ b/src/lavinmq/amqp/exchange/direct.cr @@ -41,8 +41,10 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) - @bindings[routing_key].each + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) + @bindings[routing_key].each do |destination| + yield destination + end end end end diff --git a/src/lavinmq/amqp/exchange/exchange.cr b/src/lavinmq/amqp/exchange/exchange.cr index 5f140d2d30..c5acf8d5a8 100644 --- a/src/lavinmq/amqp/exchange/exchange.cr +++ b/src/lavinmq/amqp/exchange/exchange.cr @@ -184,6 +184,7 @@ module LavinMQ abstract def bind(destination : AMQP::Destination, routing_key : String, headers : AMQP::Table?) abstract def unbind(destination : AMQP::Destination, routing_key : String, headers : AMQP::Table?) abstract def bindings_details : Iterator(BindingDetails) + abstract def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) def publish(msg : Message, immediate : Bool, queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new, @@ -232,7 +233,7 @@ module LavinMQ queues : Set(LavinMQ::Queue) = Set(LavinMQ::Queue).new, exchanges : Set(LavinMQ::Exchange) = Set(LavinMQ::Exchange).new) : Nil return unless exchanges.add? self - bindings(routing_key, headers).each do |d| + each_destination(routing_key, headers) do |d| case d in LavinMQ::Queue queues.add(d) diff --git a/src/lavinmq/amqp/exchange/fanout.cr b/src/lavinmq/amqp/exchange/fanout.cr index 5ef9e9573f..73ca9a5cd1 100644 --- a/src/lavinmq/amqp/exchange/fanout.cr +++ b/src/lavinmq/amqp/exchange/fanout.cr @@ -33,8 +33,10 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) - @bindings.each + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) + @bindings.each do |destination| + yield destination + end end end end diff --git a/src/lavinmq/amqp/exchange/headers.cr b/src/lavinmq/amqp/exchange/headers.cr index 22bbf22ee4..1cf11e04e7 100644 --- a/src/lavinmq/amqp/exchange/headers.cr +++ b/src/lavinmq/amqp/exchange/headers.cr @@ -51,10 +51,6 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(Destination) - matches(headers).each - end - private def validate!(headers) : Nil if h = headers if match = h["x-match"]? @@ -65,19 +61,26 @@ module LavinMQ end end - private def matches(headers) : Iterator(Destination) - @bindings.each.select do |args, _| + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) + @bindings.each do |args, destinations| if headers.nil? || headers.empty? - args.empty? + next unless args.empty? + destinations.each do |destination| + yield destination + end else - case args["x-match"]? - when "any" - args.any? { |k, v| !k.starts_with?("x-") && (headers.has_key?(k) && headers[k] == v) } - else - args.all? { |k, v| k.starts_with?("x-") || (headers.has_key?(k) && headers[k] == v) } + is_match = case args["x-match"]? + when "any" + args.any? { |k, v| !k.starts_with?("x-") && (headers.has_key?(k) && headers[k] == v) } + else + args.all? { |k, v| k.starts_with?("x-") || (headers.has_key?(k) && headers[k] == v) } + end + next unless is_match + destinations.each do |destination| + yield destination end end - end.flat_map { |_, v| v.each } + end end end end diff --git a/src/lavinmq/amqp/exchange/topic.cr b/src/lavinmq/amqp/exchange/topic.cr index 0762e49171..40bbfb75ad 100644 --- a/src/lavinmq/amqp/exchange/topic.cr +++ b/src/lavinmq/amqp/exchange/topic.cr @@ -3,8 +3,8 @@ require "./exchange" module LavinMQ module AMQP class TopicExchange < Exchange - @bindings = Hash(Array(String), Set(LavinMQ::Destination)).new do |h, k| - h[k] = Set(LavinMQ::Destination).new + @bindings = Hash(Array(String), Set(AMQP::Destination)).new do |h, k| + h[k] = Set(AMQP::Destination).new end def type : String @@ -42,28 +42,26 @@ module LavinMQ true end - protected def bindings(routing_key, headers) : Iterator(LavinMQ::Destination) - select_matches(routing_key).each - end - # ameba:disable Metrics/CyclomaticComplexity - private def select_matches(routing_key) : Iterator(LavinMQ::Destination) + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) binding_keys = @bindings - return Iterator(LavinMQ::Destination).empty if binding_keys.empty? + return if binding_keys.empty? # optimize the case where the only binding key is '#' if binding_keys.size == 1 bk, qs = binding_keys.first if bk.size == 1 if bk.first == "#" - return qs.each + qs.each do |q| + yield q + end end end end rk_parts = routing_key.split(".") - binding_keys.each.select do |bks, _| + binding_keys.each do |bks, destinations| ok = false prev_hash = false size = bks.size # binding keys can max be 256 chars long anyway @@ -120,8 +118,12 @@ module LavinMQ break unless ok i += 1 end - ok - end.flat_map { |_, v| v.each } + if ok + destinations.each do |destination| + yield destination + end + end + end end end end diff --git a/src/lavinmq/mqtt/exchange.cr b/src/lavinmq/mqtt/exchange.cr index df0a04c4c4..8213a1dc8c 100644 --- a/src/lavinmq/mqtt/exchange.cr +++ b/src/lavinmq/mqtt/exchange.cr @@ -85,8 +85,7 @@ module LavinMQ end # Only here to make superclass happy - protected def bindings(routing_key, headers) : Iterator(LavinMQ::Destination) - Iterator(LavinMQ::Destination).empty + protected def each_destination(routing_key : String, headers : AMQP::Table?, & : LavinMQ::Destination ->) end def bind(destination : MQTT::Session, routing_key : String, headers = nil) : Bool