Skip to content

Commit

Permalink
fix: reject connections if tenant db has low connections available
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Jan 30, 2025
1 parent a3f98f2 commit 984597c
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 57 deletions.
15 changes: 0 additions & 15 deletions lib/realtime/encryption.ex → lib/realtime/crypto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,6 @@ defmodule Realtime.Crypto do
|> unpad()
end

@doc "
Decrypts the given credentials
"
@spec decrypt_creds(binary(), binary(), binary(), binary(), binary()) ::
{binary(), binary(), binary(), binary(), binary()}
def decrypt_creds(host, port, name, user, pass) do
{
decrypt!(host),
decrypt!(port),
decrypt!(name),
decrypt!(user),
decrypt!(pass)
}
end

defp pad(data) do
to_add = 16 - rem(byte_size(data), 16)
data <> :binary.copy(<<to_add>>, to_add)
Expand Down
46 changes: 39 additions & 7 deletions lib/realtime/database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,38 @@ defmodule Realtime.Database do
}
end

@available_connection_factor 0.95

defguardp can_connect?(available_connections, required_pool)
when required_pool * @available_connection_factor < available_connections

@doc """
Checks if the Tenant CDC extension information is properly configured and that we're able to query against the tenant database.
"""
@spec check_tenant_connection(Tenant.t() | nil, binary()) :: {:error, atom()} | {:ok, pid()}
def check_tenant_connection(tenant, application_name)
def check_tenant_connection(nil, _), do: {:error, :tenant_not_found}

def check_tenant_connection(tenant, application_name) do
@spec check_tenant_connection(Tenant.t() | nil) :: {:error, atom()} | {:ok, pid()}
def check_tenant_connection(nil), do: {:error, :tenant_not_found}

def check_tenant_connection(tenant) do
tenant
|> then(&PostgresCdc.filter_settings(@cdc, &1.extensions))
|> then(fn settings ->
check_settings = from_settings(settings, application_name, :stop)
required_pool = tenant_pool_requirements(settings)
check_settings = from_settings(settings, "realtime_connect", :stop)
check_settings = Map.put(check_settings, :max_restarts, 0)

with {:ok, conn} <- connect_db(check_settings) do
case Postgrex.query(conn, "SELECT 1", []) do
{:ok, _} ->
query =
"select (current_setting('max_connections')::int - count(*))::int from pg_stat_activity"

case Postgrex.query(conn, query, []) do
{:ok, %{rows: [[available_connections]]}}
when can_connect?(available_connections, required_pool) ->
{:ok, conn}

{:ok, _} ->
{:error, :tenant_db_too_many_connections}

{:error, e} ->
Process.exit(conn, :kill)
log_error("UnableToConnectToTenantDatabase", e)
Expand Down Expand Up @@ -299,4 +312,23 @@ defmodule Realtime.Database do
Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name])
:ok
end

defp tenant_pool_requirements(settings) do
application_names = [
"realtime_subscription_manager",
"realtime_subscription_manager_pub",
"realtime_subscription_checker",
"realtime_connect",
"realtime_health_check",
"realtime_janitor",
"realtime_migrations",
"realtime_broadcast_changes",
"realtime_rls",
"realtime_replication_slot_teardown"
]

Enum.reduce(application_names, 0, fn application_name, acc ->
acc + pool_size_by_application_name(application_name, settings)
end)
end
end
11 changes: 10 additions & 1 deletion lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,13 @@ defmodule Realtime.Tenants.Connect do
{:error, {:already_started, _}} ->
get_status(tenant_id)

{:error, :killed} ->
{:error, :tenant_db_too_many_connections} ->
{:error, :tenant_db_too_many_connections}

{:error, {:shutdown, :tenant_db_too_many_connections}} ->
{:error, :tenant_db_too_many_connections}

{:error, :shutdown} ->
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database")
{:error, :tenant_database_unavailable}

Expand Down Expand Up @@ -172,6 +178,9 @@ defmodule Realtime.Tenants.Connect do
log_error("TenantNotFound", "Tenant not found")
{:stop, :shutdown}

{:error, :tenant_db_too_many_connections} ->
{:stop, {:shutdown, :tenant_db_too_many_connections}}

{:error, error} ->
log_error("UnableToConnectToTenantDatabase", error)
{:stop, :shutdown}
Expand Down
3 changes: 1 addition & 2 deletions lib/realtime/tenants/connect/check_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ defmodule Realtime.Tenants.Connect.CheckConnection do
"""
alias Realtime.Database

@application_name "realtime_connect"
@behaviour Realtime.Tenants.Connect.Piper
@impl true
def run(acc) do
%{tenant: tenant} = acc

case Database.check_tenant_connection(tenant, @application_name) do
case Database.check_tenant_connection(tenant) do
{:ok, conn} ->
{:ok, %{acc | db_conn_pid: conn, db_conn_reference: Process.monitor(conn)}}

Expand Down
4 changes: 4 additions & 0 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ defmodule RealtimeWeb.RealtimeChannel do
msg = "Please increase your connection pool size"
Logging.log_error_message(:warning, "IncreaseConnectionPool", msg)

{:error, :tenant_db_too_many_connections} ->
msg = "Database can't accept more connections, Realtime won't connect"
Logging.log_error_message(:warning, "DatabaseLackOfConnections", msg)

{:error, :unable_to_set_policies, error} ->
Logging.log_error_message(:warning, "UnableToSetPolicies", error)

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.15",
version: "2.34.17",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
42 changes: 42 additions & 0 deletions test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,48 @@ defmodule Realtime.DatabaseTest do
%{tenant: tenant}
end

describe "check_tenant_connection/1" do
setup context do
extensions = [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "localhost",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"db_port" => "5433",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => false,
"db_pool" => Map.get(context, :db_pool),
"subcriber_pool_size" => Map.get(context, :subcriber_pool),
"subs_pool_size" => Map.get(context, :db_pool)
}
}
]

tenant = tenant_fixture(%{extensions: extensions})

%{tenant: tenant}
end

test "connects to a tenant database", %{tenant: tenant} do
assert {:ok, _} = Database.check_tenant_connection(tenant)
end

# Connection limit for docker tenant db is 100
@tag db_pool: 50,
subs_pool_size: 50,
subcriber_pool_size: 50
test "restricts connection if tenant database cannot receive more connections based on tenant pool",
%{tenant: tenant} do
assert {:error, :tenant_db_too_many_connections} = Database.check_tenant_connection(tenant)
end
end

describe "replication_slot_teardown/1" do
test "removes replication slots with the realtime prefix", %{tenant: tenant} do
{:ok, conn} = Database.connect(tenant, "realtime_test", :stop)
Expand Down
34 changes: 4 additions & 30 deletions test/realtime/repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule Realtime.RepoTest do
import Ecto.Query

alias Realtime.Api.Message
alias Realtime.Crypto
alias Realtime.Repo
alias Realtime.Database
alias Realtime.Tenants.Migrations
Expand Down Expand Up @@ -328,34 +327,9 @@ defmodule Realtime.RepoTest do
end

defp db_config() do
tenant = tenant_fixture()

%{
"db_host" => db_host,
"db_name" => db_name,
"db_password" => db_password,
"db_port" => db_port,
"db_user" => db_user
} = args = tenant.extensions |> hd() |> then(& &1.settings)

{host, port, name, user, pass} =
Crypto.decrypt_creds(
db_host,
db_port,
db_name,
db_user,
db_password
)

ssl_enforced = Database.default_ssl_param(args)

[
hostname: host,
port: port,
database: name,
password: pass,
username: user,
ssl_enforced: ssl_enforced
]
tenant_fixture()
|> Realtime.Database.from_tenant("realtime_test")
|> Map.to_list()
|> Keyword.new()
end
end
28 changes: 28 additions & 0 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,34 @@ defmodule Realtime.Tenants.ConnectTest do
test "if tenant does not exist, does nothing" do
assert :ok = Connect.shutdown("none")
end

test "tenant not able to connect if database is not able to handle it" do
extensions = [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "localhost",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"db_port" => "5433",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => false,
"db_pool" => 100,
"subcriber_pool_size" => 100,
"subs_pool_size" => 100
}
}
]

tenant = tenant_fixture(%{extensions: extensions})

assert {:error, :tenant_db_too_many_connections} =
Connect.lookup_or_start_connection(tenant.external_id)
end
end

defp check_db_connections_created(test_pid, tenant_id) do
Expand Down
32 changes: 31 additions & 1 deletion test/realtime_web/channels/realtime_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ defmodule RealtimeWeb.RealtimeChannelTest do
import ExUnit.CaptureLog

alias Phoenix.Socket
alias Realtime.Tenants.Authorization
alias RealtimeWeb.ChannelsAuthorization
alias RealtimeWeb.Joken.CurrentTime
alias RealtimeWeb.UserSocket
Expand Down Expand Up @@ -218,6 +217,37 @@ defmodule RealtimeWeb.RealtimeChannelTest do
assert {:error, %{reason: "Realtime was unable to connect to the project database"}} =
subscribe_and_join(socket, "realtime:test", %{})
end

test "lack of connections halts join" do
extensions = [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "localhost",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"db_port" => "5433",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => false,
"db_pool" => 100,
"subcriber_pool_size" => 100,
"subs_pool_size" => 100
}
}
]

tenant = tenant_fixture(%{extensions: extensions})

{:ok, %Socket{} = socket} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant.external_id))

assert {:error, %{reason: "Database can't accept more connections, Realtime won't connect"}} =
subscribe_and_join(socket, "realtime:test", %{})
end
end

defp conn_opts(tenant_id \\ @tenant_external_id, claims \\ %{}) do
Expand Down

0 comments on commit 984597c

Please sign in to comment.