From 3826c6161b34f02531f805f7d593c360f1568f4e Mon Sep 17 00:00:00 2001 From: Jason Axelson Date: Sun, 28 Apr 2024 08:27:19 -1000 Subject: [PATCH] Log when jobs don't finish executing within `:shutdown_grace_period` If jobs take longer than `:shutdown_grace_period` (by default 15 seconds) then they are brutally killed. Log a message when this occurs. The message looks like: > Oban's :alpha queue was unable to cleanly shutdown in allotted time (:shutdown_grace_period was 10). Remaining job ids: (job ids: [1941]) --- lib/oban/queue/supervisor.ex | 2 ++ lib/oban/queue/watchman.ex | 50 ++++++++++++++++++++++++++++++++---- test/oban_test.exs | 21 ++++++++++++--- 3 files changed, 65 insertions(+), 8 deletions(-) diff --git a/lib/oban/queue/supervisor.ex b/lib/oban/queue/supervisor.ex index f9e06ef6..e5dfe8f0 100644 --- a/lib/oban/queue/supervisor.ex +++ b/lib/oban/queue/supervisor.ex @@ -36,9 +36,11 @@ defmodule Oban.Queue.Supervisor do |> Keyword.put_new(:dispatch_cooldown, conf.dispatch_cooldown) watch_opts = [ + conf_name: conf.name, foreman: fore_name, name: watch_name, producer: prod_name, + queue: queue, shutdown: conf.shutdown_grace_period ] diff --git a/lib/oban/queue/watchman.ex b/lib/oban/queue/watchman.ex index 5d3bc243..924d1488 100644 --- a/lib/oban/queue/watchman.ex +++ b/lib/oban/queue/watchman.ex @@ -6,20 +6,28 @@ defmodule Oban.Queue.Watchman do alias Oban.Queue.Producer alias __MODULE__, as: State + require Logger + + # Give a little extra shutdown time so we can report an unclean shutdown that + # might leave orphaned jobs + @extra_shutdown_time 250 + @type option :: {:foreman, GenServer.name()} + | {:conf_name, Oban.name()} | {:name, module()} | {:producer, GenServer.name()} + | {:queue, Oban.queue_name()} | {:shutdown, timeout()} - defstruct [:foreman, :producer, :shutdown, interval: 10] + defstruct [:foreman, :producer, :queue, :conf_name, :shutdown, interval: 10] @spec child_spec([option]) :: Supervisor.child_spec() def child_spec(opts) do shutdown = case opts[:shutdown] do 0 -> :brutal_kill - value -> value + value when is_integer(value) -> value + @extra_shutdown_time end %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}, shutdown: shutdown} @@ -53,15 +61,47 @@ defmodule Oban.Queue.Watchman do :ok end - defp wait_for_executing(state) do + defp wait_for_executing(state, start_time \\ :erlang.monotonic_time(:millisecond)) do case DynamicSupervisor.count_children(state.foreman) do %{active: 0} -> :ok _ -> - :ok = Process.sleep(state.interval) + if shutdown_time_exceeded?(state, :erlang.monotonic_time(:millisecond) - start_time) do + print_non_clean_exit_message(state) + :ok + else + :ok = Process.sleep(state.interval) + + wait_for_executing(state, start_time) + end + end + end + + defp print_non_clean_exit_message(%State{conf_name: nil}), do: :ok - wait_for_executing(state) + defp print_non_clean_exit_message(%State{} = state) do + case Oban.check_queue(state.conf_name, queue: state.queue) do + %{running: []} -> + :ok + + %{running: running_job_ids} -> + jobs_message = "(job ids: [#{Enum.join(running_job_ids, ", ")}])" + + Logger.warning( + "Oban's :#{state.queue} queue was unable to cleanly shutdown in allotted time " <> + "(:shutdown_grace_period was #{state.shutdown}). Remaining job ids: #{jobs_message}" + ) end + catch + :exit, reason -> + Logger.warning("watchman error reason: #{inspect(reason, pretty: true)}") end + + defp shutdown_time_exceeded?(%State{shutdown: shutdown}, elapsed_time) + when is_integer(shutdown) and elapsed_time > shutdown do + true + end + + defp shutdown_time_exceeded?(_, _), do: false end diff --git a/test/oban_test.exs b/test/oban_test.exs index fe5da753..f0505e61 100644 --- a/test/oban_test.exs +++ b/test/oban_test.exs @@ -43,7 +43,13 @@ defmodule ObanTest do assert_receive {:started, 1} assert_receive {:started, 2} - :ok = stop_supervised(name) + log = + ExUnit.CaptureLog.capture_log(fn -> + :ok = stop_supervised(name) + end) + + assert log =~ "Oban's :alpha queue was unable to cleanly shutdown in allotted time" + assert log =~ "job ids: [#{id_2}]" assert_receive {:ok, 1} refute_receive {:ok, 2}, 20 @@ -64,7 +70,9 @@ defmodule ObanTest do assert_receive {:started, 1} - :ok = stop_supervised(name) + ExUnit.CaptureLog.capture_log(fn -> + :ok = stop_supervised(name) + end) insert!(ref: 2, sleep: 50) @@ -86,7 +94,10 @@ defmodule ObanTest do assert_receive {:started, 1} assert_receive {:started, 3} - {time, _} = :timer.tc(fn -> stop_supervised(name) end) + {{time, _}, _log} = + ExUnit.CaptureLog.with_log(fn -> + :timer.tc(fn -> stop_supervised(name) end) + end) assert System.convert_time_unit(time, :microsecond, :millisecond) >= 10 @@ -115,6 +126,10 @@ defmodule ObanTest do assert %{limit: 2, queue: "gamma", running: [_]} = Oban.check_queue(name, queue: :gamma) assert %{paused: true, queue: "delta", running: []} = Oban.check_queue(name, queue: :delta) + + ExUnit.CaptureLog.capture_log(fn -> + :ok = stop_supervised(name) + end) end test "checking an unknown or invalid queue" do