Skip to content

Commit

Permalink
Disable estimation for non-postgres adapters
Browse files Browse the repository at this point in the history
Without an estimate function or guessing the Reporter queries are
compatible with SQLite and MySQL adapters.
  • Loading branch information
sorentwo committed Dec 22, 2024
1 parent ea64120 commit 3cb4b0d
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 13 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ oban_met-*.tar
# Temporary files, for example, from tests.
/tmp/

# Temporary release notes
NOTES.md
# Ignore local .db files
/priv/*.db*
14 changes: 10 additions & 4 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
import Config

config :oban_met, auto_start: false, reporter: [estimate_limit: 1000]
config :logger, level: :warning

config :oban_met, ecto_repos: [Oban.Pro.Repo]
config :oban_met, auto_start: false, reporter: [estimate_limit: 1000]

config :oban_met, Oban.Met.Repo,
priv: "test/support/",
priv: "test/support/postgres",
url: System.get_env("DATABASE_URL") || "postgres://localhost:5432/oban_met_test",
pool: Ecto.Adapters.SQL.Sandbox

config :logger, level: :warning
config :oban_met, Oban.Met.LiteRepo,
database: "priv/oban.db",
priv: "test/support/sqlite",
stacktrace: true,
temp_store: :memory

config :oban_met, ecto_repos: [Oban.Met.Repo, Oban.Met.LiteRepo]

config :stream_data, max_runs: 40
19 changes: 16 additions & 3 deletions lib/oban/met/reporter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ defmodule Oban.Met.Reporter do

@spec start_link(Keyword.t()) :: GenServer.on_start()
def start_link(opts) do
conf = Keyword.fetch!(opts, :conf)

opts =
if conf.repo.__adapter__() == Ecto.Adapters.Postgres do
opts
else
opts
|> Keyword.put(:auto_migrate, false)
|> Keyword.put(:estimate_limit, :infinity)
end

state = struct!(State, opts)

GenServer.start_link(__MODULE__, state, name: opts[:name])
Expand Down Expand Up @@ -118,16 +129,18 @@ defmodule Oban.Met.Reporter do
# EXECUTE as we're doing here. A named function helps the performance because it is prepared,
# and we have to support distributed databases that don't allow DO/END functions.
defp create_estimate_function(%{auto_migrate: true, function_created?: false} = state) do
%{conf: %{prefix: prefix}} = state

query = """
CREATE OR REPLACE FUNCTION #{state.conf.prefix}.oban_count_estimate(state text, queue text)
CREATE OR REPLACE FUNCTION #{prefix}.oban_count_estimate(state text, queue text)
RETURNS integer AS $func$
DECLARE
plan jsonb;
BEGIN
EXECUTE 'EXPLAIN (FORMAT JSON)
SELECT id
FROM #{state.conf.prefix}.oban_jobs
WHERE state = $1::#{state.conf.prefix}.oban_job_state
FROM #{prefix}.oban_jobs
WHERE state = $1::#{prefix}.oban_job_state
AND queue = $2'
INTO plan
USING state, queue;
Expand Down
5 changes: 3 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ defmodule Oban.Met.MixProject do
"hex.publish package --yes",
"lys.publish"
],
"test.reset": ["ecto.drop -r Oban.Met.Repo", "test.setup"],
"test.setup": ["ecto.create -r Oban.Met.Repo --quiet", "ecto.migrate -r Oban.Met.Repo"],
"test.reset": ["ecto.drop --quiet", "test.setup"],
"test.setup": ["ecto.create --quiet", "ecto.migrate --quiet"],
"test.ci": [
"format --check-formatted",
"deps.unlock --check-unused",
Expand All @@ -82,6 +82,7 @@ defmodule Oban.Met.MixProject do
[
{:oban, "~> 2.15"},
{:telemetry, "~> 1.1"},
{:ecto_sqlite3, "~> 0.9", only: [:test, :dev]},
{:postgrex, "~> 0.19", only: [:test, :dev]},
{:stream_data, "~> 1.1", only: [:test, :dev]},
{:benchee, "~> 1.3", only: [:test, :dev], runtime: false},
Expand Down
4 changes: 4 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
%{
"benchee": {:hex, :benchee, "1.3.1", "c786e6a76321121a44229dde3988fc772bca73ea75170a73fd5f4ddf1af95ccf", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "76224c58ea1d0391c8309a8ecbfe27d71062878f59bd41a390266bf4ac1cc56d"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"cc_precompiler": {:hex, :cc_precompiler, "0.1.10", "47c9c08d8869cf09b41da36538f62bc1abd3e19e41701c2cea2675b53c704258", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "f6e046254e53cd6b41c6bacd70ae728011aa82b2742a80d6e2214855c6e06b22"},
"credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"},
"db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"},
"decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"},
Expand All @@ -9,8 +10,11 @@
"earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"},
"ecto": {:hex, :ecto, "3.12.3", "1a9111560731f6c3606924c81c870a68a34c819f6d4f03822f370ea31a582208", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9efd91506ae722f95e48dc49e70d0cb632ede3b7a23896252a60a14ac6d59165"},
"ecto_sql": {:hex, :ecto_sql, "3.12.0", "73cea17edfa54bde76ee8561b30d29ea08f630959685006d9c6e7d1e59113b7d", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dc9e4d206f274f3947e96142a8fdc5f69a2a6a9abb4649ef5c882323b6d512f0"},
"ecto_sqlite3": {:hex, :ecto_sqlite3, "0.17.5", "fbee5c17ff6afd8e9ded519b0abb363926c65d30b27577232bb066b2a79957b8", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "~> 3.12", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:exqlite, "~> 0.22", [hex: :exqlite, repo: "hexpm", optional: false]}], "hexpm", "3b54734d998cbd032ac59403c36acf4e019670e8b6ceef9c6c33d8986c4e9704"},
"elixir_make": {:hex, :elixir_make, "0.9.0", "6484b3cd8c0cee58f09f05ecaf1a140a8c97670671a6a0e7ab4dc326c3109726", [:mix], [], "hexpm", "db23d4fd8b757462ad02f8aa73431a426fe6671c80b200d9710caf3d1dd0ffdb"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"},
"exqlite": {:hex, :exqlite, "0.27.1", "73fc0b3dc3b058a77a2b3771f82a6af2ddcf370b069906968a34083d2ffd2884", [:make, :mix], [{:cc_precompiler, "~> 0.1", [hex: :cc_precompiler, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:elixir_make, "~> 0.8", [hex: :elixir_make, repo: "hexpm", optional: false]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "79ef5756451cfb022e8013e1ed00d0f8f7d1333c19502c394dc16b15cfb4e9b4"},
"file_system": {:hex, :file_system, "1.0.1", "79e8ceaddb0416f8b8cd02a0127bdbababe7bf4a23d2a395b983c1f8b3f73edd", [:mix], [], "hexpm", "4414d1f38863ddf9120720cd976fce5bdde8e91d8283353f0e31850fa89feb9e"},
"hex_core": {:hex, :hex_core, "0.10.0", "6e739a159b0141fa6c3c60c92b73aa6dec5b7909647a9b9ecea9da6709b75709", [:rebar3], [], "hexpm", "1c229aeb2df3a7ffc0c00fa4fc1721995058b2c617f083cf617e29258b1d9f57"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
Expand Down
17 changes: 17 additions & 0 deletions test/oban/met/reporter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,23 @@ defmodule Oban.Met.ReporterTest do
[]
)
end

@tag lite: true
@tag oban_opts: [engine: Oban.Engines.Lite, repo: Oban.Met.LiteRepo, testing: :disabled]
test "reporting estimates for the Lite engine", %{conf: conf} do
pid = start_supervised!({Reporter, conf: conf, name: @name, estimate_limit: 0})

Oban.insert!(conf.name, Job.new(%{}, queue: "alpha", worker: "Worker.A"))
Oban.insert!(conf.name, Job.new(%{}, queue: "gamma", worker: "Worker.B"))

Notifier.listen(conf.name, [:metrics])
send(pid, :checkpoint)

assert_receive {:notification, :metrics, %{"metrics" => metrics}}

assert "available" in utake(metrics, "state")
assert ~w(alpha gamma) = utake(metrics, "queue")
end
end

defp utake(metrics, key) do
Expand Down
6 changes: 4 additions & 2 deletions test/support/case.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ defmodule Oban.Met.Case do
end

setup tags do
pid = Sandbox.start_owner!(Oban.Met.Repo, shared: not tags[:async])
if !tags[:lite] do
pid = Sandbox.start_owner!(Oban.Met.Repo, shared: not tags[:async])

on_exit(fn -> Sandbox.stop_owner(pid) end)
on_exit(fn -> Sandbox.stop_owner(pid) end)
end

:ok
end
Expand Down
8 changes: 8 additions & 0 deletions test/support/repo.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
defmodule Oban.Met.Repo do
@moduledoc false

use Ecto.Repo, otp_app: :oban_met, adapter: Ecto.Adapters.Postgres
end

defmodule Oban.Met.LiteRepo do
@moduledoc false

use Ecto.Repo, otp_app: :oban_met, adapter: Ecto.Adapters.SQLite3
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule Oban.Met.LiteRepo.Migrations.AddObanJobsTable do
use Ecto.Migration

defdelegate up, to: Oban.Migration
defdelegate down, to: Oban.Migration
end
1 change: 1 addition & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
Oban.Met.Repo.start_link()
Oban.Met.LiteRepo.start_link()
ExUnit.start()
Ecto.Adapters.SQL.Sandbox.mode(Oban.Met.Repo, :manual)

0 comments on commit 3cb4b0d

Please sign in to comment.