Skip to content

Commit

Permalink
Add initialize to amqp connection_factory (#922)
Browse files Browse the repository at this point in the history
* add initialize to amqp connection_factory

* for specs, re-init connection factory for server restart
  • Loading branch information
kickster97 authored Jan 23, 2025
1 parent 09a75d3 commit 09876b6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
21 changes: 12 additions & 9 deletions src/lavinmq/amqp/connection_factory.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@ module LavinMQ
class ConnectionFactory < LavinMQ::ConnectionFactory
Log = LavinMQ::Log.for "amqp.connection_factory"

def start(socket, connection_info, vhosts, users) : Client?
def initialize(@users : UserStore, @vhosts : VHostStore)
end

def start(socket, connection_info) : Client?
remote_address = connection_info.src
socket.read_timeout = 15.seconds
metadata = ::Log::Metadata.build({address: remote_address.to_s})
logger = Logger.new(Log, metadata)
if confirm_header(socket, logger)
if start_ok = start(socket, logger)
if user = authenticate(socket, remote_address, users, start_ok, logger)
if user = authenticate(socket, remote_address, start_ok, logger)
if tune_ok = tune(socket, logger)
if vhost = open(socket, vhosts, user, logger)
if vhost = open(socket, user, logger)
socket.read_timeout = heartbeat_timeout(tune_ok)
return LavinMQ::AMQP::Client.new(socket, connection_info, vhost, user, tune_ok, start_ok)
end
Expand All @@ -39,7 +42,7 @@ module LavinMQ
end
end

def confirm_header(socket, log) : Bool
def confirm_header(socket, log : Logger) : Bool
proto = uninitialized UInt8[8]
count = socket.read(proto.to_slice)
if count.zero? # EOF, socket closed by peer
Expand Down Expand Up @@ -71,7 +74,7 @@ module LavinMQ
},
})

def start(socket, log)
def start(socket, log : Logger)
start = AMQP::Frame::Connection::Start.new(server_properties: SERVER_PROPERTIES)
socket.write_bytes start, ::IO::ByteFormat::NetworkEndian
socket.flush
Expand Down Expand Up @@ -100,9 +103,9 @@ module LavinMQ
end
end

def authenticate(socket, remote_address, users, start_ok, log)
def authenticate(socket, remote_address, start_ok, log)
username, password = credentials(start_ok)
user = users[username]?
user = @users[username]?
return user if user && user.password && user.password.not_nil!.verify(password) &&
guest_only_loopback?(remote_address, user)

Expand Down Expand Up @@ -150,10 +153,10 @@ module LavinMQ
tune_ok
end

def open(socket, vhosts, user, log)
def open(socket, user, log)
open = AMQP::Frame.from_io(socket) { |f| f.as(AMQP::Frame::Connection::Open) }
vhost_name = open.vhost.empty? ? "/" : open.vhost
if vhost = vhosts[vhost_name]?
if vhost = @vhosts[vhost_name]?
if user.permissions[vhost_name]?
if vhost.max_connections.try { |max| vhost.connections.size >= max }
log.warn { "Max connections (#{vhost.max_connections}) reached for vhost #{vhost_name}" }
Expand Down
5 changes: 3 additions & 2 deletions src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ module LavinMQ
@users = UserStore.new(@data_dir, @replicator)
@vhosts = VHostStore.new(@data_dir, @users, @replicator)
@parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator)
@amqp_connection_factory = LavinMQ::AMQP::ConnectionFactory.new
@amqp_connection_factory = LavinMQ::AMQP::ConnectionFactory.new(@users, @vhosts)
apply_parameter
spawn stats_loop, name: "Server#stats_loop"
end
Expand Down Expand Up @@ -66,6 +66,7 @@ module LavinMQ
@users = UserStore.new(@data_dir, @replicator)
@vhosts = VHostStore.new(@data_dir, @users, @replicator)
@parameters = ParameterStore(Parameter).new(@data_dir, "parameters.json", @replicator)
@amqp_connection_factory = LavinMQ::AMQP::ConnectionFactory.new(@users, @vhosts)
apply_parameter
@closed = false
Fiber.yield
Expand Down Expand Up @@ -245,7 +246,7 @@ module LavinMQ
end

def handle_connection(socket, connection_info)
client = @amqp_connection_factory.start(socket, connection_info, @vhosts, @users)
client = @amqp_connection_factory.start(socket, connection_info)
ensure
socket.close if client.nil?
end
Expand Down

0 comments on commit 09876b6

Please sign in to comment.