diff --git a/lib/realtime/api/tenant.ex b/lib/realtime/api/tenant.ex index 924119708..7b9237c26 100644 --- a/lib/realtime/api/tenant.ex +++ b/lib/realtime/api/tenant.ex @@ -25,7 +25,6 @@ defmodule Realtime.Api.Tenant do field(:suspend, :boolean, default: false) field(:events_per_second_rolling, :float, virtual: true) field(:events_per_second_now, :integer, virtual: true) - field(:notify_private_alpha, :boolean, default: false) field(:private_only, :boolean, default: false) has_many(:extensions, Realtime.Api.Extensions, @@ -74,7 +73,6 @@ defmodule Realtime.Api.Tenant do :max_channels_per_client, :max_joins_per_second, :suspend, - :notify_private_alpha, :private_only ]) |> validate_required([ @@ -113,7 +111,6 @@ defmodule Realtime.Api.Tenant do :max_channels_per_client, :max_joins_per_second, :suspend, - :notify_private_alpha, :private_only ]) end diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index 44d203d9a..3706660cd 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -280,7 +280,6 @@ defmodule Realtime.Tenants do :max_channels_per_client :max_joins_per_second :suspend - :notify_private_alpha :private_only """ @spec update_management(String.t(), map()) :: Tenant.t() | nil diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index 344e6be9c..065e57c00 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -194,8 +194,8 @@ defmodule Realtime.Tenants.Connect do def handle_continue(:start_listen_and_replication, state) do %{tenant: tenant} = state - with {:ok, broadcast_changes_pid} <- start_replication(tenant, self()), - {:ok, listen_pid} <- start_listen(tenant, self()) do + with {:ok, broadcast_changes_pid} <- ReplicationConnection.start(tenant, self()), + {:ok, listen_pid} <- Listen.start(tenant, self()) do {:noreply, %{state | broadcast_changes_pid: broadcast_changes_pid, listen_pid: listen_pid}, {:continue, :setup_connected_user_events}} else @@ -336,12 +336,4 @@ defmodule Realtime.Tenants.Connect do defp tenant_suspended?(%Tenant{suspend: true}), do: {:error, :tenant_suspended} defp tenant_suspended?(_), do: :ok - - defp start_replication(%{notify_private_alpha: false}, _), do: {:ok, nil} - - defp start_replication(tenant, connect_pid), - do: ReplicationConnection.start(tenant, connect_pid) - - defp start_listen(%{notify_private_alpha: false}, _), do: {:ok, nil} - defp start_listen(tenant, connect_pid), do: Listen.start(tenant, connect_pid) end diff --git a/mix.exs b/mix.exs index 3d17dfdfd..94dd5c09e 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.33.84", + version: "2.34.0", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/priv/repo/seeds.exs b/priv/repo/seeds.exs index 3f666ad59..777cf842f 100644 --- a/priv/repo/seeds.exs +++ b/priv/repo/seeds.exs @@ -34,8 +34,7 @@ Repo.transaction(fn -> "ssl_enforced" => false } } - ], - "notify_private_alpha" => true + ] }) |> Repo.insert!() end) diff --git a/priv/repo/seeds_after_migration.exs b/priv/repo/seeds_after_migration.exs index d57f28401..68da74d4a 100644 --- a/priv/repo/seeds_after_migration.exs +++ b/priv/repo/seeds_after_migration.exs @@ -33,8 +33,7 @@ end } ], "external_id" => tenant_name, - "jwt_secret" => "secure_jwt_secret", - "notify_private_alpha" => true + "jwt_secret" => "secure_jwt_secret" } |> Api.create_tenant() diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 324f0cd4c..da9ca8da7 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -610,8 +610,7 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [ :authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence - ], - notify_private_alpha: true + ] test "broadcast insert event changes on insert in table with trigger", %{ topic: topic, db_conn: db_conn, @@ -652,7 +651,6 @@ defmodule Realtime.Integration.RtChannelTest do :authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence ], - notify_private_alpha: true, requires_data: true test "broadcast update event changes on update in table with trigger", %{ topic: topic, @@ -703,8 +701,7 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [ :authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence - ], - notify_private_alpha: true + ] test "broadcast delete event changes on delete in table with trigger", %{ topic: topic, db_conn: db_conn, @@ -746,8 +743,7 @@ defmodule Realtime.Integration.RtChannelTest do @tag policies: [ :authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence - ], - notify_private_alpha: true + ] test "broadcast event when function 'send' is called with private topic", %{ topic: topic, db_conn: db_conn @@ -784,7 +780,6 @@ defmodule Realtime.Integration.RtChannelTest do 500 end - @tag notify_private_alpha: true test "broadcast event when function 'send' is called with public topic", %{ topic: topic, db_conn: db_conn diff --git a/test/realtime/database_test.exs b/test/realtime/database_test.exs index a9db5e0a5..a582dd6fd 100644 --- a/test/realtime/database_test.exs +++ b/test/realtime/database_test.exs @@ -42,6 +42,7 @@ defmodule Realtime.DatabaseTest do ) Database.replication_slot_teardown(conn, name) + Process.sleep(1000) assert %{rows: []} = Postgrex.query!(conn, "SELECT slot_name FROM pg_replication_slots", []) end diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index 50acd25ba..7bdf96bfd 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -12,13 +12,13 @@ defmodule Realtime.Tenants.ConnectTest do alias Realtime.Tenants.ReplicationConnection alias Realtime.UsersCounter - describe "lookup_or_start_connection/1" do - setup do - Cleanup.ensure_no_replication_slot() - tenant = tenant_fixture() - %{tenant: tenant} - end + setup do + Cleanup.ensure_no_replication_slot() + tenant = tenant_fixture() + %{tenant: tenant} + end + describe "lookup_or_start_connection/1" do test "if tenant exists and connected, returns the db connection", %{tenant: tenant} do assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) Sandbox.allow(Repo, self(), db_conn) @@ -76,7 +76,7 @@ defmodule Realtime.Tenants.ConnectTest do Sandbox.allow(Repo, self(), db_conn) # Not enough time has passed, connection still alive - :timer.sleep(500) + :timer.sleep(400) assert {_, %{conn: _}} = :syn.lookup(Connect, tenant_id) # Enough time has passed, connection stopped @@ -93,7 +93,6 @@ defmodule Realtime.Tenants.ConnectTest do {:ok, db_conn} = Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 10) - Process.link(db_conn) Sandbox.allow(Repo, self(), db_conn) assert {pid, %{conn: conn_pid}} = :syn.lookup(Connect, tenant_id) @@ -110,7 +109,6 @@ defmodule Realtime.Tenants.ConnectTest do {:ok, db_conn} = Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 10) - Process.link(db_conn) assert {_pid, %{conn: _conn_pid}} = :syn.lookup(Connect, tenant_id) :timer.sleep(1000) :syn.leave(:users, tenant_id, self()) @@ -189,8 +187,7 @@ defmodule Realtime.Tenants.ConnectTest do end end - test "starts broadcast handler and does not fail on existing connection" do - tenant = tenant_fixture(%{notify_private_alpha: true}) + test "starts broadcast handler and does not fail on existing connection", %{tenant: tenant} do on_exit(fn -> Connect.shutdown(tenant.external_id) end) assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) @@ -205,8 +202,7 @@ defmodule Realtime.Tenants.ConnectTest do assert replication_connection_before == replication_connection_after end - test "failed broadcast handler and listen recover from failure" do - tenant = tenant_fixture(%{notify_private_alpha: true}) + test "failed broadcast handler and listen recover from failure", %{tenant: tenant} do on_exit(fn -> Connect.shutdown(tenant.external_id) end) assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) :timer.sleep(3000) @@ -229,8 +225,7 @@ defmodule Realtime.Tenants.ConnectTest do assert Process.alive?(listen_pid) end - test "on database disconnect, connection is killed to all components" do - tenant = tenant_fixture(%{notify_private_alpha: true}) + test "on database disconnect, connection is killed to all components", %{tenant: tenant} do on_exit(fn -> Connect.shutdown(tenant.external_id) end) assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) old_pid = Connect.whereis(tenant.external_id) @@ -257,9 +252,7 @@ defmodule Realtime.Tenants.ConnectTest do end describe "shutdown/1" do - test "shutdowns all associated connections" do - tenant = tenant_fixture(%{notify_private_alpha: true}) - + test "shutdowns all associated connections", %{tenant: tenant} do assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) Process.sleep(1000) diff --git a/test/realtime_web/controllers/broadcast_controller_test.exs b/test/realtime_web/controllers/broadcast_controller_test.exs index f7c1784c1..1e5c00e44 100644 --- a/test/realtime_web/controllers/broadcast_controller_test.exs +++ b/test/realtime_web/controllers/broadcast_controller_test.exs @@ -242,10 +242,11 @@ defmodule RealtimeWeb.BroadcastControllerTest do start_supervised(Realtime.RateCounter.DynamicSupervisor) start_supervised(Realtime.GenCounter.DynamicSupervisor) start_supervised(RealtimeWeb.Joken.CurrentTime.Mock) + tenant = tenant_fixture() jwt_secret = Crypto.decrypt!(tenant.jwt_secret) - {:ok, _} = start_supervised({Connect, tenant_id: tenant.external_id}, restart: :transient) + {:ok, _} = Connect.lookup_or_start_connection(tenant.external_id) {:ok, db_conn} = Connect.get_status(tenant.external_id) clean_table(db_conn, "realtime", "messages") @@ -261,6 +262,8 @@ defmodule RealtimeWeb.BroadcastControllerTest do |> put_req_header("authorization", "Bearer #{jwt}") |> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"}) + on_exit(fn _ -> Connect.shutdown(tenant.external_id) end) + {:ok, conn: conn, db_conn: db_conn, tenant: tenant} end diff --git a/test/realtime_web/controllers/tenant_controller_test.exs b/test/realtime_web/controllers/tenant_controller_test.exs index a85a3e206..69e1e0580 100644 --- a/test/realtime_web/controllers/tenant_controller_test.exs +++ b/test/realtime_web/controllers/tenant_controller_test.exs @@ -353,6 +353,6 @@ defmodule RealtimeWeb.TenantControllerTest do end defp create_tenant(_) do - %{tenant: tenant_fixture(%{notify_private_alpha: true})} + %{tenant: tenant_fixture()} end end diff --git a/test/support/generators.ex b/test/support/generators.ex index adb0e97ca..1ea2e6a7d 100644 --- a/test/support/generators.ex +++ b/test/support/generators.ex @@ -29,8 +29,7 @@ defmodule Generators do ], "postgres_cdc_default" => "postgres_cdc_rls", "jwt_secret" => "new secret", - "jwt_jwks" => nil, - "notify_private_alpha" => false + "jwt_jwks" => nil } override = override |> Enum.map(fn {k, v} -> {"#{k}", v} end) |> Map.new()