Skip to content

Commit

Permalink
Skip queue existence check for pause/resume all
Browse files Browse the repository at this point in the history
Closes #1000
  • Loading branch information
sorentwo committed Dec 13, 2023
1 parent d57fb5e commit e7e0431
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
15 changes: 11 additions & 4 deletions lib/oban.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1322,11 +1322,18 @@ defmodule Oban do
end

defp validate_queue_exists!(name, opts) do
queue = opts[:queue]
local_only = opts[:local_only]
queue = Keyword.fetch!(opts, :queue)
lonly = Keyword.get(opts, :local_only, false)

if local_only && is_nil(Registry.whereis(name, {:producer, to_string(queue)})) do
raise ArgumentError, "queue #{inspect(queue)} does not exist locally"
cond do
queue == :* ->
:ok

lonly and is_nil(Registry.whereis(name, {:producer, to_string(queue)})) ->
raise ArgumentError, "queue #{inspect(queue)} does not exist locally"

true ->
:ok
end
end
end
28 changes: 28 additions & 0 deletions test/oban_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,34 @@ defmodule ObanTest do
assert %{paused: false} = Oban.check_queue(name, queue: :delta)
end)
end

test "pausing and resuming all local queues" do
opts = [queues: [alpha: 1, gamma: 1]]

name1 = start_supervised_oban!(opts)
name2 = start_supervised_oban!(opts)

assert :ok = Oban.pause_all_queues(name1, local_only: true)

with_backoff(fn ->
assert %{paused: true} = Oban.check_queue(name1, queue: :alpha)
assert %{paused: true} = Oban.check_queue(name1, queue: :gamma)

assert %{paused: false} = Oban.check_queue(name2, queue: :alpha)
assert %{paused: false} = Oban.check_queue(name2, queue: :gamma)
end)

assert :ok = Oban.pause_all_queues(name2, local_only: true)
assert :ok = Oban.resume_all_queues(name1, local_only: true)

with_backoff(fn ->
assert %{paused: false} = Oban.check_queue(name1, queue: :alpha)
assert %{paused: false} = Oban.check_queue(name1, queue: :gamma)

assert %{paused: true} = Oban.check_queue(name2, queue: :alpha)
assert %{paused: true} = Oban.check_queue(name2, queue: :gamma)
end)
end
end

describe "scale_queue/2" do
Expand Down

0 comments on commit e7e0431

Please sign in to comment.