From 8100a998825321bef30657e807ddb8750172b18c Mon Sep 17 00:00:00 2001 From: Viktor Erlingsson Date: Wed, 10 Jan 2024 11:39:59 +0100 Subject: [PATCH] Use DirectDispatcher for logging (#619) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use DirectDispatcher for logging and adds more logging to startup procedure. --------- Co-authored-by: Carl Hörberg --- CHANGELOG.md | 1 + src/lavinmq/launcher.cr | 4 ++-- src/lavinmq/queue/message_store.cr | 9 +++++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d717384339..d20ffa65ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Build binaries and container images using Crystal 1.11.0 - Don't allow clients open an already open channel +- Use DirectDispatcher for logging and add more logging to startup procedure [#619](https://github.com/cloudamqp/lavinmq/pull/619) ### Added diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 50dc6964f4..d08b362cba 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -82,9 +82,9 @@ module LavinMQ log_file = (path = @config.log_file) ? File.open(path, "a") : STDOUT broadcast_backend = ::Log::BroadcastBackend.new backend = if ENV.has_key?("JOURNAL_STREAM") - ::Log::IOBackend.new(io: log_file, formatter: JournalLogFormat) + ::Log::IOBackend.new(io: log_file, formatter: JournalLogFormat, dispatcher: ::Log::DirectDispatcher) else - ::Log::IOBackend.new(io: log_file, formatter: StdoutLogFormat) + ::Log::IOBackend.new(io: log_file, formatter: StdoutLogFormat, dispatcher: ::Log::DirectDispatcher) end broadcast_backend.append(backend, @config.log_level) diff --git a/src/lavinmq/queue/message_store.cr b/src/lavinmq/queue/message_store.cr index cfe5bf181a..e772a8aaa2 100644 --- a/src/lavinmq/queue/message_store.cr +++ b/src/lavinmq/queue/message_store.cr @@ -265,6 +265,12 @@ module LavinMQ end private def load_deleted_from_disk + count = 0u32 + ack_files = 0u32 + Dir.each(@data_dir) do |f| + ack_files += 1 if f.starts_with? "acks." + end + Dir.each_child(@data_dir) do |child| next unless child.starts_with? "acks." seg = child[5, 10].to_u32 @@ -282,6 +288,7 @@ module LavinMQ end @replicator.try &.register_file(file) end + Log.info { "Loading acks (#{count}/#{ack_files})" } if (count += 1) % 128 == 0 @deleted[seg] = acked.sort! unless acked.empty? end end @@ -320,6 +327,7 @@ module LavinMQ # Populate bytesize, size and segment_msg_count private def load_stats_from_segments : Nil + counter = 0 @segments.each do |seg, mfile| count = 0u32 loop do @@ -337,6 +345,7 @@ module LavinMQ end mfile.pos = 4 mfile.unmap # will be mmap on demand + Log.info { "Loading stats (#{counter}/#{@segments.size})" } if (counter += 1) % 128 == 0 @segment_msg_count[seg] = count end end