diff --git a/src/lavinmq/mqtt/broker.cr b/src/lavinmq/mqtt/broker.cr index d8f311998..5aee32be6 100644 --- a/src/lavinmq/mqtt/broker.cr +++ b/src/lavinmq/mqtt/broker.cr @@ -74,7 +74,7 @@ module LavinMQ end def subscribe(client, packet) - session = sessions[client.client_id]? || sessions.declare(client.client_id, client.@clean_session) + session = sessions[client.client_id]? || sessions.declare(client) session.client = client qos = Array(MQTT::SubAck::ReturnCode).new(packet.topic_filters.size) packet.topic_filters.each do |tf| diff --git a/src/lavinmq/mqtt/sessions.cr b/src/lavinmq/mqtt/sessions.cr index 312722669..645a17cbb 100644 --- a/src/lavinmq/mqtt/sessions.cr +++ b/src/lavinmq/mqtt/sessions.cr @@ -18,10 +18,10 @@ module LavinMQ @queues["mqtt.#{client_id}"].as(Session) end - def declare(client_id : String, clean_session : Bool) - self[client_id]? || begin - @vhost.declare_queue("mqtt.#{client_id}", !clean_session, clean_session, AMQP::Table.new({"x-queue-type": "mqtt"})) - self[client_id] + def declare(client : Client) + self[client.client_id]? || begin + @vhost.declare_queue("mqtt.#{client.client_id}", !client.@clean_session, client.@clean_session, AMQP::Table.new({"x-queue-type": "mqtt"})) + self[client.client_id] end end