diff --git a/lib/oban.ex b/lib/oban.ex index 66bd317e..9a291e5b 100644 --- a/lib/oban.ex +++ b/lib/oban.ex @@ -83,8 +83,10 @@ defmodule Oban do defguardp is_list_or_wrapper(cw) when is_list(cw) or - (is_map(cw) and is_map_key(cw, :changesets) and is_list(cw.changesets)) or - is_function(cw, 1) + is_struct(cw, Stream) or + is_function(cw, 1) or + (is_map_key(cw, :changesets) and is_list(cw.changesets)) or + (is_map_key(cw, :changesets) and is_struct(cw.changesets, Stream)) @doc """ Creates a facade for `Oban` functions and automates fetching configuration from the application @@ -602,12 +604,19 @@ defmodule Oban do ## Example - Insert 100 jobs with a single operation: + Insert a list of 100 jobs at once: 1..100 |> Enum.map(&MyApp.Worker.new(%{id: &1})) |> Oban.insert_all() + Insert a stream of jobs at once (be sure the stream terminates!): + + (fn -> MyApp.Worker.new(%{})) + |> Stream.repeatedly() + |> Stram.take(100) + |> Oban.insert_all() + Insert with a custom timeout: 1..100 diff --git a/lib/oban/engine.ex b/lib/oban/engine.ex index 6c4f677a..4c2e35d7 100644 --- a/lib/oban/engine.ex +++ b/lib/oban/engine.ex @@ -303,6 +303,7 @@ defmodule Oban.Engine do defp expand(fun, changes) when is_function(fun, 1), do: expand(fun.(changes), changes) defp expand(%{changesets: changesets}, _), do: expand(changesets, %{}) + defp expand(changesets, _) when is_struct(changesets, Stream), do: changesets defp expand(changesets, _) when is_list(changesets), do: changesets defp with_span(event, %Config{} = conf, base_meta, fun) do diff --git a/lib/oban/job.ex b/lib/oban/job.ex index b510b586..6ab1631a 100644 --- a/lib/oban/job.ex +++ b/lib/oban/job.ex @@ -124,7 +124,7 @@ defmodule Oban.Job do @type changeset :: Ecto.Changeset.t(t()) @type changeset_fun :: (map() -> changeset()) - @type changeset_list :: [changeset()] + @type changeset_list :: Enumerable.t(changeset()) @type changeset_list_fun :: (map() -> changeset_list()) schema "oban_jobs" do diff --git a/test/oban/engine_test.exs b/test/oban/engine_test.exs index 2ad586ed..dec6a453 100644 --- a/test/oban/engine_test.exs +++ b/test/oban/engine_test.exs @@ -313,10 +313,22 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do assert_receive {:event, [:insert_all_jobs, :stop], _, %{jobs: _, opts: []}} end + test "inserting multiple jobs with a stream", %{name: name} do + changesets = Stream.map(0..1, &Worker.new(%{ref: &1})) + + [_job_1, _job_2] = Oban.insert_all(name, changesets) + end + test "inserting multiple jobs from a changeset wrapper", %{name: name} do - wrap = %{changesets: [Worker.new(%{ref: 0}), Worker.new(%{ref: 1})]} + changesets = [Worker.new(%{ref: 0}), Worker.new(%{ref: 1})] + + [_job_1, _job_2] = Oban.insert_all(name, %{changesets: changesets}) + end + + test "inserting multiple jobs from a changeset wrapped stream", %{name: name} do + changesets = Stream.map(0..1, &Worker.new(%{ref: &1})) - [_job_1, _job_2] = Oban.insert_all(name, wrap) + [_job_1, _job_2] = Oban.insert_all(name, %{changesets: changesets}) end test "handling empty changesets list from a wrapper", %{name: name} do diff --git a/test/support/exercise.ex b/test/support/exercise.ex index 03ee0a0b..ecdb961c 100644 --- a/test/support/exercise.ex +++ b/test/support/exercise.ex @@ -20,6 +20,7 @@ defmodule Oban.Test.Exercise do def check_insert_all do changeset = changeset() + stream = Stream.duplicate(changeset, 1) wrapper = %{changesets: [changeset]} [_ | _] = Oban.insert_all([changeset]) @@ -27,6 +28,11 @@ defmodule Oban.Test.Exercise do [_ | _] = Oban.insert_all([changeset], timeout: 500) [_ | _] = Oban.insert_all(Oban, [changeset], timeout: 500) + [_ | _] = Oban.insert_all(stream) + [_ | _] = Oban.insert_all(Oban, stream) + [_ | _] = Oban.insert_all(stream, timeout: 500) + [_ | _] = Oban.insert_all(Oban, stream, timeout: 500) + [_ | _] = Oban.insert_all(wrapper) [_ | _] = Oban.insert_all(Oban, wrapper) [_ | _] = Oban.insert_all(wrapper, timeout: 500)