From f23a0567665358898726f4c4de831f91382f0903 Mon Sep 17 00:00:00 2001 From: T Floyd Wright Date: Sat, 2 Nov 2024 14:55:09 -0800 Subject: [PATCH] Add API for internal pubsub channel --- README.md | 4 ++ README.md.eex | 4 ++ dev/user_admin.ex | 46 +++++++-------------- lib/live_admin/components/container.ex | 15 ++++--- lib/live_admin/components/nav/jobs.ex | 25 ++++++----- lib/live_admin/components/resource/index.ex | 26 +++--------- lib/live_admin/notifier.ex | 23 +++++++++++ 7 files changed, 74 insertions(+), 69 deletions(-) create mode 100644 lib/live_admin/notifier.ex diff --git a/README.md b/README.md index ceab2ed..a09ee67 100644 --- a/README.md +++ b/README.md @@ -146,6 +146,10 @@ def MyModule end ``` +### Notifications + +LiveAdmin.Notifier provides an API for a basic notification system that can be used to trigger events from LiveAdmin.Resource modules, or other application code. + ### Multi tenancy To enable Multi tenant support, simply implement a `prefixes/0` function in your Ecto Repo module that returns a list of prefixes. diff --git a/README.md.eex b/README.md.eex index 171719c..78de395 100644 --- a/README.md.eex +++ b/README.md.eex @@ -146,6 +146,10 @@ def MyModule end ``` +### Notifications + +LiveAdmin.Notifier provides an API for a basic notification system that can be used to trigger events from LiveAdmin.Resource modules, or other application code. + ### Multi tenancy To enable Multi tenant support, simply implement a `prefixes/0` function in your Ecto Repo module that returns a list of prefixes. diff --git a/dev/user_admin.ex b/dev/user_admin.ex index d5edbd4..7c1dd36 100644 --- a/dev/user_admin.ex +++ b/dev/user_admin.ex @@ -62,38 +62,20 @@ defmodule DemoWeb.UserAdmin do Each user will get 16 random bytes of their very own. """ def regenerate_passwords(query, session) do - Phoenix.PubSub.broadcast( - LiveAdmin.PubSub, - "session:#{session.id}", - {:job, :regenerate, :start, "Regenerating user passwords"} - ) - - count = Demo.Repo.aggregate(Demo.Accounts.User, :count, prefix: session.prefix) - - query - |> Demo.Repo.all(prefix: session.prefix) - |> Enum.reduce(0.0, fn user, progress -> - user - |> Ecto.Changeset.change(encrypted_password: :crypto.strong_rand_bytes(16) |> Base.encode16()) - |> Demo.Repo.update() - - progress = progress + 1/count - - Phoenix.PubSub.broadcast( - LiveAdmin.PubSub, - "session:#{session.id}", - {:job, :regenerate, :progress, progress} - ) - - progress - end) - - - Phoenix.PubSub.broadcast( - LiveAdmin.PubSub, - "session:#{session.id}", - {:job, :regenerate, :complete} - ) + count = Demo.Repo.aggregate(Demo.Accounts.User, :count, prefix: session.prefix) + + query + |> Demo.Repo.all(prefix: session.prefix) + |> Enum.with_index() + |> Enum.each(fn {user, i} -> + user + |> Ecto.Changeset.change(encrypted_password: :crypto.strong_rand_bytes(16) |> Base.encode16()) + |> Demo.Repo.update() + + LiveAdmin.Notifier.job(session, self(), i/count) + end) + + LiveAdmin.Notifier.job(session, self(), 1) {:ok, "updated"} end diff --git a/lib/live_admin/components/container.ex b/lib/live_admin/components/container.ex index 7b5af4c..1a59e52 100644 --- a/lib/live_admin/components/container.ex +++ b/lib/live_admin/components/container.ex @@ -93,25 +93,28 @@ defmodule LiveAdmin.Components.Container do @impl true def handle_event( "task", - params = %{"name" => task}, + params = %{"name" => name}, socket = %{ assigns: %{session: session, resource: resource, config: config} } ) do {_, m, f, _, _} = - LiveAdmin.fetch_function(resource, session, :tasks, String.to_existing_atom(task)) + LiveAdmin.fetch_function(resource, session, :tasks, String.to_existing_atom(name)) args = [session | Map.get(params, "args", [])] search = Map.get(socket.assigns, :search) - Task.Supervisor.async_nolink(LiveAdmin.Task.Supervisor, m, f, [ - Resource.query(resource, search, config) | args - ]) + task = + Task.Supervisor.async_nolink(LiveAdmin.Task.Supervisor, m, f, [ + Resource.query(resource, search, config) | args + ]) + + LiveAdmin.Notifier.job(session, task.pid, 0, label: name) socket = socket - |> put_flash(:info, trans("%{task} started", inter: [task: task])) + |> put_flash(:info, trans("%{task} started", inter: [task: name])) |> push_navigate(to: route_with_params(socket.assigns)) {:noreply, socket} diff --git a/lib/live_admin/components/nav/jobs.ex b/lib/live_admin/components/nav/jobs.ex index 273b303..8a02e85 100644 --- a/lib/live_admin/components/nav/jobs.ex +++ b/lib/live_admin/components/nav/jobs.ex @@ -9,6 +9,18 @@ defmodule LiveAdmin.Components.Nav.Jobs do {:ok, assign(socket, jobs: []), layout: false} end + defp set_progress(socket, target_pid, progress) do + update(socket, :jobs, fn jobs -> + Enum.map(jobs, fn job = {pid, label, _} -> + if target_pid == pid do + {pid, label, progress} + else + job + end + end) + end) + end + @impl true def handle_info({:job, pid, :start, label}, socket) do Process.monitor(pid) @@ -20,16 +32,7 @@ defmodule LiveAdmin.Components.Nav.Jobs do @impl true def handle_info({:job, pid, :progress, progress}, socket) do - socket = - update(socket, :jobs, fn jobs -> - Enum.map(jobs, fn job = {job_pid, label, _} -> - if pid == job_pid do - {pid, label, progress} - else - job - end - end) - end) + socket = set_progress(socket, pid, progress) {:noreply, socket} end @@ -38,7 +41,7 @@ defmodule LiveAdmin.Components.Nav.Jobs do def handle_info({:job, pid, :complete}, socket) do Process.send_after(self(), {:remove_job, pid}, 1500) - {:noreply, socket} + {:noreply, set_progress(socket, pid, 1.0)} end @impl true diff --git a/lib/live_admin/components/resource/index.ex b/lib/live_admin/components/resource/index.ex index 74d6cd8..9a5f1fd 100644 --- a/lib/live_admin/components/resource/index.ex +++ b/lib/live_admin/components/resource/index.ex @@ -396,33 +396,19 @@ defmodule LiveAdmin.Components.Container.Index do fn -> pid = self() - Phoenix.PubSub.broadcast( - LiveAdmin.PubSub, - "session:#{session.id}", - {:job, pid, :start, label} - ) + LiveAdmin.Notifier.job(session, pid, 0, label: label) records = Resource.all(ids, resource, prefix, repo) - Enum.reduce(records, 0.0, fn record, progress -> + records + |> Enum.with_index() + |> Enum.each(fn {record, i} -> apply(mod, func, [record | args]) - progress = progress + 1 / length(records) - - Phoenix.PubSub.broadcast( - LiveAdmin.PubSub, - "session:#{session.id}", - {:job, pid, :progress, progress} - ) - - progress + LiveAdmin.Notifier.job(session, pid, i / length(records)) end) - Phoenix.PubSub.broadcast( - LiveAdmin.PubSub, - "session:#{session.id}", - {:job, pid, :complete} - ) + LiveAdmin.Notifier.job(session, pid, 1) end, timeout: :infinity ) diff --git a/lib/live_admin/notifier.ex b/lib/live_admin/notifier.ex new file mode 100644 index 0000000..192dcdc --- /dev/null +++ b/lib/live_admin/notifier.ex @@ -0,0 +1,23 @@ +defmodule LiveAdmin.Notifier do + @spec job(LiveAdmin.Session.t(), pid(), float() | integer(), [label: String.t()]) :: :ok + @spec job(LiveAdmin.Session.t(), pid(), float() | integer()) :: :ok + def job(session, pid, progress, opts \\ []) + + def job(session, pid, 0, opts), + do: broadcast(session, {:job, pid, :start, Keyword.get(opts, :label, "")}) + + def job(session, pid, progress, _) when progress >= 1, + do: broadcast(session, {:job, pid, :complete}) + + def job(session, pid, progress, _), do: broadcast(session, {:job, pid, :progress, progress}) + + def broadcast(session, info) do + Phoenix.PubSub.broadcast( + LiveAdmin.PubSub, + "session:#{session.id}", + info + ) + + :ok + end +end