Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Move logs to warnings; Add error for connection pool #1284

Merged
merged 4 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ This is the list of operational codes that can help you understand your deployme
| UnableToFindCounter | Error when trying to find a counter to track rate limits for a tenant |
| UnhandledProcessMessage | Unhandled message received by a Realtime process |
| UnableToSetPolicies | We were not able to set policies for this connection |
| IncreaseConnectionPool | The number of connections you have set for Realtime are not enough to handle your current use case |
| RlsPolicyError | Error on RLS policy used for authorization |
| ConnectionInitializing | Database is initializing connection |
| DatabaseConnectionIssue | Database had connection issues and connection was not able to be established |
| UnableToConnectToProject | Unable to connect to Project database |
Expand Down
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ config :tailwind,
# Configures Elixir's Logger
config :logger, :console,
format: "$time $metadata[$level] $message\n",
metadata: [:request_id, :project, :external_id, :application_name]
metadata: [:request_id, :project, :external_id, :application_name, :sub, :error_code]

# Use Jason for JSON parsing in Phoenix
config :phoenix, :json_library, Jason
Expand Down
2 changes: 1 addition & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ config :joken,
current_time_adapter: RealtimeWeb.Joken.CurrentTime.Mock

# Print only errors during test
config :logger, level: :error
config :logger, level: :warning

# Configures Elixir's Logger
config :logger, :console,
Expand Down
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
msg =
"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"

log_error("RealtimeDisabledForConfiguration", msg)
log_warning("RealtimeDisabledForConfiguration", msg)
rollback(conn, msg)

{:error, exception} ->
Expand Down
8 changes: 8 additions & 0 deletions lib/realtime/logs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@ defmodule Realtime.Logs do
def log_error(code, error, metadata \\ []) do
Logger.error("#{code}: #{to_log(error)}", [error_code: code] ++ metadata)
end

@doc """
Logs warning with a given Operational Code
"""
@spec log_error(String.t(), any(), keyword()) :: :ok
def log_warning(code, warning, metadata \\ []) do
Logger.warning("#{code}: #{to_log(warning)}", [{:error_code, code} | metadata])
end
end

defimpl Jason.Encoder, for: DBConnection.ConnectionError do
Expand Down
63 changes: 46 additions & 17 deletions lib/realtime/tenants/authorization.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Realtime.Tenants.Authorization do
alias Realtime.Database
alias Realtime.Repo
alias Realtime.Tenants.Authorization.Policies

alias DBConnection.ConnectionError
defstruct [:topic, :headers, :jwt, :claims, :role]

@type t :: %__MODULE__{
Expand Down Expand Up @@ -54,14 +54,16 @@ defmodule Realtime.Tenants.Authorization do
Runs validations based on RLS policies to set policies for read policies a given connection (either Phoenix.Socket or Plug.Conn).
"""
@spec get_read_authorizations(Socket.t() | Conn.t(), pid(), __MODULE__.t()) ::
{:ok, Socket.t() | Conn.t()} | {:error, any()}
{:ok, Socket.t() | Conn.t()} | {:error, any()} | {:error, :rls_policy_error, any()}

def get_read_authorizations(%Socket{} = socket, db_conn, authorization_context) do
policies = Map.get(socket.assigns, :policies) || %Policies{}

with {:ok, %Policies{} = policies} <-
get_read_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, Socket.assign(socket, :policies, policies)}
case get_read_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Socket.assign(socket, :policies, policies)}
{:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error}
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
{:error, error} -> {:error, error}
end
end

Expand All @@ -70,6 +72,8 @@ defmodule Realtime.Tenants.Authorization do

case get_read_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Conn.assign(conn, :policies, policies)}
{:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error}
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
{:error, error} -> {:error, error}
end
end
Expand All @@ -78,7 +82,9 @@ defmodule Realtime.Tenants.Authorization do
Runs validations based on RLS policies to set policies for read policies a given connection (either Phoenix.Socket or Conn).
"""
@spec get_write_authorizations(Socket.t() | Conn.t() | pid(), pid(), __MODULE__.t()) ::
{:ok, Socket.t() | Conn.t() | Policies.t()} | {:error, any()}
{:ok, Socket.t() | Conn.t() | Policies.t()}
| {:error, any()}
| {:error, :rls_policy_error, any()}

def get_write_authorizations(
%Socket{} = socket,
Expand All @@ -89,6 +95,8 @@ defmodule Realtime.Tenants.Authorization do

case get_write_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Socket.assign(socket, :policies, policies)}
{:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error}
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
{:error, error} -> {:error, error}
end
end
Expand All @@ -98,13 +106,17 @@ defmodule Realtime.Tenants.Authorization do

case get_write_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Conn.assign(conn, :policies, policies)}
{:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error}
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
{:error, error} -> {:error, error}
end
end

def get_write_authorizations(db_conn, db_conn, authorization_context) when is_pid(db_conn) do
case get_write_policies_for_connection(db_conn, authorization_context, %Policies{}) do
{:ok, %Policies{} = policies} -> {:ok, policies}
{:ok, {:error, %Postgrex.Error{} = error}} -> {:error, :rls_policy_error, error}
{:error, %ConnectionError{reason: :queue_timeout}} -> {:error, :increase_connection_pool}
{:error, error} -> {:error, error}
end
end
Expand Down Expand Up @@ -217,13 +229,14 @@ defmodule Realtime.Tenants.Authorization do
or_where: [extension: :presence, id: ^presence_id]
)

{:ok, res} = Repo.all(conn, query, Message)
can_presence? = Enum.any?(res, fn %{id: id} -> id == presence_id end)
can_broadcast? = Enum.any?(res, fn %{id: id} -> id == broadcast_id end)
with {:ok, res} <- Repo.all(conn, query, Message) do
can_presence? = Enum.any?(res, fn %{id: id} -> id == presence_id end)
can_broadcast? = Enum.any?(res, fn %{id: id} -> id == broadcast_id end)

policies
|> Policies.update_policies(:presence, :read, can_presence?)
|> Policies.update_policies(:broadcast, :read, can_broadcast?)
policies
|> Policies.update_policies(:presence, :read, can_presence?)
|> Policies.update_policies(:broadcast, :read, can_broadcast?)
end
end

defp get_write_policy_for_connection_and_extension(
Expand All @@ -237,11 +250,27 @@ defmodule Realtime.Tenants.Authorization do
presence_changeset =
Message.changeset(%Message{}, %{topic: authorization_context.topic, extension: :presence})

broadcast_result = Repo.insert(conn, broadcast_changeset, Message, mode: :savepoint)
presence_result = Repo.insert(conn, presence_changeset, Message, mode: :savepoint)
policies =
case Repo.insert(conn, broadcast_changeset, Message, mode: :savepoint) do
{:ok, _} ->
Policies.update_policies(policies, :broadcast, :write, true)

policies
|> Policies.update_policies(:presence, :write, match?({:ok, _}, presence_result))
|> Policies.update_policies(:broadcast, :write, match?({:ok, _}, broadcast_result))
{:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} ->
Policies.update_policies(policies, :broadcast, :write, false)

e ->
e
end

case Repo.insert(conn, presence_changeset, Message, mode: :savepoint) do
{:ok, _} ->
Policies.update_policies(policies, :presence, :write, true)

{:error, %Postgrex.Error{postgres: %{code: :insufficient_privilege}}} ->
Policies.update_policies(policies, :presence, :write, false)

e ->
e
end
end
end
2 changes: 1 addition & 1 deletion lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Realtime.Tenants.Migrations do
@moduledoc """
Run Realtime database migrations for tenant's database.
"""
use GenServer
use GenServer, restart: :transient

require Logger

Expand Down
42 changes: 25 additions & 17 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,18 @@ defmodule RealtimeWeb.RealtimeChannel do
{:ok, state, assign(socket, assigns)}
else
{:error, :expired_token, msg} ->
Logging.log_error_message(:error, "InvalidJWTToken", msg)
Logging.log_error_message(:warning, "InvalidJWTToken", msg)

{:error, :missing_claims} ->
msg = "Fields `role` and `exp` are required in JWT"
Logging.log_error_message(:error, "InvalidJWTToken", msg)
Logging.log_error_message(:warning, "InvalidJWTToken", msg)

{:error, :expected_claims_map} ->
msg = "Token claims must be a map"
Logging.log_error_message(:error, "InvalidJWTToken", msg)
Logging.log_error_message(:warning, "InvalidJWTToken", msg)

{:error, :unauthorized, msg} ->
Logging.log_error_message(:error, "Unauthorized", msg)
Logging.log_error_message(:warning, "Unauthorized", msg)

{:error, :too_many_channels} ->
msg = "Too many channels"
Expand All @@ -129,6 +129,13 @@ defmodule RealtimeWeb.RealtimeChannel do
msg = "Too many joins per second"
Logging.log_error_message(:error, "ClientJoinRateLimitReached", msg)

{:error, :increase_connection_pool} ->
msg = "Please increase your connection pool size"
Logging.log_error_message(:warning, "IncreaseConnectionPool", msg)

{:error, :unable_to_set_policies, error} ->
Logging.log_error_message(:warning, "UnableToSetPolicies", error)

{:error, :tenant_database_unavailable} ->
Logging.log_error_message(
:error,
Expand Down Expand Up @@ -180,13 +187,6 @@ defmodule RealtimeWeb.RealtimeChannel do
"Realtime is restarting, please standby"
)

{:error, :unable_to_set_policies} ->
Logging.log_error_message(
:error,
"UnableToSetPolicies",
"Unable to set policies for connection"
)

{:error, error} ->
Logging.log_error_message(:error, "UnknownErrorOnChannel", error)
end
Expand Down Expand Up @@ -284,7 +284,7 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, :pg_sub_ref, nil)}

error ->
log_error("UnableToSubscribeToPostgres", error)
log_warning("UnableToSubscribeToPostgres", error)
push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end
Expand All @@ -294,13 +294,13 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe())}

error ->
log_error("UnableToSubscribeToPostgres", error)
log_warning("UnableToSubscribeToPostgres", error)
push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end
rescue
error ->
log_error("UnableToSubscribeToPostgres", error)
log_warning("UnableToSubscribeToPostgres", error)
push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end
Expand Down Expand Up @@ -585,8 +585,8 @@ defmodule RealtimeWeb.RealtimeChannel do
defp shutdown_response(socket, message) when is_binary(message) do
%{assigns: %{channel_name: channel_name, access_token: access_token}} = socket
metadata = log_metadata(access_token)
log_error("ChannelShutdown", message, metadata)
push_system_message("system", socket, "error", message, channel_name)
log_warning("ChannelShutdown", message, metadata)
{:stop, :shutdown, socket}
end

Expand Down Expand Up @@ -749,9 +749,17 @@ defmodule RealtimeWeb.RealtimeChannel do
{:ok, socket}
end
else
{:error, :increase_connection_pool} ->
{:error, :increase_connection_pool}

{:error, :rls_policy_error, error} ->
log_error("RlsPolicyError", error)

{:error, :unauthorized,
"You do not have permissions to read from this Channel topic: #{topic}"}

{:error, error} ->
log_error("UnableToSetPolicies", error)
{:error, :unable_to_set_policies}
{:error, :unable_to_set_policies, error}
end
end

Expand Down
22 changes: 16 additions & 6 deletions lib/realtime_web/channels/realtime_channel/broadcast_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
"""
require Logger
import Phoenix.Socket, only: [assign: 3]
import Realtime.Logs

alias Phoenix.Socket
alias Realtime.GenCounter
Expand All @@ -29,14 +30,23 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
}
} = socket
) do
{:ok, socket} = run_authorization_check(socket, db_conn, authorization_context)
with {:ok, %{assigns: %{policies: policies}}} <-
run_authorization_check(socket, db_conn, authorization_context) do
case policies do
%Policies{broadcast: %BroadcastPolicies{write: false}} ->
Logger.info("Broadcast message ignored on #{tenant_topic}")

case socket.assigns.policies do
%Policies{broadcast: %BroadcastPolicies{write: false}} ->
Logger.info("Broadcast message ignored on #{tenant_topic}")
_ ->
send_message(self_broadcast, tenant_topic, payload)
end
else
{:error, :increase_connection_pool} ->
log_error("IncreaseConnectionPool", "Please increase your connection pool size")
{:error, :unable_to_set_policies}

_ ->
send_message(self_broadcast, tenant_topic, payload)
{:error, error} ->
log_error("UnableToSetPolicies", error)
{:error, :unable_to_set_policies}
end

socket = increment_rate_counter(socket)
Expand Down
11 changes: 5 additions & 6 deletions lib/realtime_web/channels/realtime_channel/logging.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do
) :: {:error, %{reason: binary()}}
def log_error_message(level, code, error, metadata \\ [])

def log_error_message(:warning, _code, error, metadata) do
error_msg = "Start channel error: " <> to_log(error)
Logger.warning(error_msg, metadata)
{:error, %{reason: error_msg}}
end

def log_error_message(:error, code, error, metadata) do
log_error(code, error, metadata)
{:error, %{reason: error}}
end

def log_error_message(:warning, code, error, metadata) do
log_warning(code, error, metadata)
{:error, %{reason: error}}
end
end
10 changes: 5 additions & 5 deletions lib/realtime_web/channels/user_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ defmodule RealtimeWeb.UserSocket do
{:error, :tenant_not_found}

{:error, :expired_token, msg} ->
log_error_with_token_metadata(msg, token)
log_warning_with_token_metadata(msg, token)
{:error, :expired_token}

{:error, :missing_claims} ->
log_error_with_token_metadata("Fields `role` and `exp` are required in JWT", token)
log_warning_with_token_metadata("Fields `role` and `exp` are required in JWT", token)
{:error, :missing_claims}

error ->
Expand All @@ -108,14 +108,14 @@ defmodule RealtimeWeb.UserSocket do
end
end

defp log_error_with_token_metadata(msg, token) do
defp log_warning_with_token_metadata(msg, token) do
case Joken.peek_claims(token) do
{:ok, claims} ->
sub = Map.get(claims, "sub")
log_error("InvalidJWTToken", msg, sub: sub)
log_warning("InvalidJWTToken", msg, sub: sub)

_ ->
log_error("InvalidJWTToken", msg)
log_warning("InvalidJWTToken", msg)
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.10",
version: "2.34.11",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading
Loading