Skip to content

Commit

Permalink
fix: Separate read and write authorization and add telemetry
Browse files Browse the repository at this point in the history
* Separates read and write flows for authorization to ensure we are able to only check writes when user tries to broadcast or track presence
* Adds telemetry to Database.transaction so we can check latency of queries
* Add latency on Authorization queries
  • Loading branch information
filipecabaco committed Jan 22, 2025
1 parent 5f1c769 commit eca8a57
Show file tree
Hide file tree
Showing 12 changed files with 360 additions and 102 deletions.
14 changes: 13 additions & 1 deletion lib/realtime/database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Realtime.Database do
alias Realtime.Crypto
alias Realtime.PostgresCdc
alias Realtime.Rpc
alias Realtime.Telemetry

defstruct [
:hostname,
Expand Down Expand Up @@ -151,7 +152,18 @@ defmodule Realtime.Database do
end

defp transaction_catched(db_conn, func, opts, metadata) do
Postgrex.transaction(db_conn, func, opts)
telemetry = Keyword.get(opts, :telemetry, nil)

if telemetry do
{latency, value} =
:timer.tc(fn -> Postgrex.transaction(db_conn, func, opts) end, :millisecond)

Telemetry.execute(telemetry, %{latency: latency}, %{})

value
else
Postgrex.transaction(db_conn, func, opts)
end
rescue
e ->
log_error("ErrorExecutingTransaction", e, metadata)
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ defmodule Realtime.Rpc do
@moduledoc """
RPC module for Realtime with the intent of standardizing the RPC interface and collect telemetry
"""
alias Realtime.Telemetry
import Realtime.Logs
alias Realtime.Telemetry

@doc """
Calls external node using :rpc.call/5 and collects telemetry
Expand Down
172 changes: 121 additions & 51 deletions lib/realtime/tenants/authorization.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ defmodule Realtime.Tenants.Authorization do
Check more information at Realtime.Tenants.Authorization.Policies
"""
require Logger
import Ecto.Query

alias Phoenix.Socket
alias Plug.Conn
alias Realtime.Api.Message
alias Realtime.Database
alias Realtime.Api.Message
alias Realtime.Database
alias Realtime.Repo
alias Realtime.Tenants.Authorization.Policies

Expand Down Expand Up @@ -48,31 +51,78 @@ defmodule Realtime.Tenants.Authorization do
end

@doc """
Runs validations based on RLS policies to set policies for a given connection (either Phoenix.Socket or Plug.Conn).
Runs validations based on RLS policies to set policies for read policies a given connection (either Phoenix.Socket or Plug.Conn).
"""
@spec get_authorizations(Phoenix.Socket.t() | Plug.Conn.t(), DBConnection.t(), __MODULE__.t()) ::
{:ok, Phoenix.Socket.t() | Plug.Conn.t()} | {:error, any()}
def get_authorizations(%Phoenix.Socket{} = socket, db_conn, authorization_context) do
case get_authorizations(db_conn, authorization_context) do
{:ok, %Policies{} = policies} -> {:ok, Phoenix.Socket.assign(socket, :policies, policies)}
@spec get_read_authorizations(
Socket.t() | Conn.t(),
DBConnection.t(),
__MODULE__.t(),
Policies.t()
) ::
{:ok, Socket.t() | Conn.t()} | {:error, any()}
def get_read_authorizations(socket, db_conn, authorization_context, policies \\ %Policies{})

def get_read_authorizations(
%Socket{} = socket,
db_conn,
authorization_context,
policies
) do
policies = Map.get(socket.assigns, :policies, %Policies{}) || policies

case get_read_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Socket.assign(socket, :policies, policies)}
{:error, error} -> {:error, error}
end
end

def get_authorizations(%Plug.Conn{} = conn, db_conn, authorization_context) do
case get_authorizations(db_conn, authorization_context) do
{:ok, %Policies{} = policies} -> {:ok, Plug.Conn.assign(conn, :policies, policies)}
def get_read_authorizations(%Conn{} = conn, db_conn, authorization_context, policies) do
policies = Map.get(conn.assigns, :policies, %Policies{}) || policies

case get_read_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Conn.assign(conn, :policies, policies)}
{:error, error} -> {:error, error}
end
end

@doc """
Runs validations based on RLS policies and returns the policies
Runs validations based on RLS policies to set policies for read policies a given connection (either Phoenix.Socket or Conn).
"""
@spec get_authorizations(DBConnection.t(), __MODULE__.t()) ::
{:ok, %Policies{}} | {:error, any()}
def get_authorizations(db_conn, authorization_context) do
case get_policies_for_connection(db_conn, authorization_context) do
@spec get_write_authorizations(
Socket.t() | Conn.t() | pid(),
DBConnection.t(),
__MODULE__.t(),
Policies.t()
) ::
{:ok, Socket.t() | Conn.t() | Policies.t()} | {:error, any()}
def get_write_authorizations(socket, db_conn, authorization_context, policies \\ %Policies{})

def get_write_authorizations(
%Socket{} = socket,
db_conn,
authorization_context,
policies
) do
policies = Map.get(socket.assigns, :policies, %Policies{}) || policies

case get_write_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Socket.assign(socket, :policies, policies)}
{:error, error} -> {:error, error}
end
end

def get_write_authorizations(%Conn{} = conn, db_conn, authorization_context, policies) do
policies = Map.get(conn.assigns, :policies, %Policies{}) || policies

case get_write_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, Conn.assign(conn, :policies, policies)}
{:error, error} -> {:error, error}
end
end

def get_write_authorizations(db_conn, db_conn, authorization_context, policies)
when is_pid(db_conn) do
case get_write_policies_for_connection(db_conn, authorization_context, policies) do
{:ok, %Policies{} = policies} -> {:ok, policies}
{:error, error} -> {:error, error}
end
Expand Down Expand Up @@ -113,44 +163,64 @@ defmodule Realtime.Tenants.Authorization do
)
end

defp get_policies_for_connection(conn, authorization_context) do
Database.transaction(conn, fn transaction_conn ->
messages = [
Message.changeset(%Message{}, %{topic: authorization_context.topic, extension: :broadcast}),
Message.changeset(%Message{}, %{topic: authorization_context.topic, extension: :presence})
]

{:ok, messages} = Repo.insert_all_entries(transaction_conn, messages, Message)

{[%{id: broadcast_id}], [%{id: presence_id}]} =
Enum.split_with(messages, &(&1.extension == :broadcast))

set_conn_config(transaction_conn, authorization_context)
policies = %Policies{}

policies =
get_read_policy_for_connection_and_extension(
transaction_conn,
authorization_context,
broadcast_id,
presence_id,
policies
)

policies =
get_write_policy_for_connection_and_extension(
transaction_conn,
authorization_context,
policies
)

Postgrex.query!(transaction_conn, "ROLLBACK AND CHAIN", [])

policies
end)
defp get_read_policies_for_connection(conn, authorization_context, policies) do
Database.transaction(
conn,
fn transaction_conn ->
messages = [
Message.changeset(%Message{}, %{
topic: authorization_context.topic,
extension: :broadcast
}),
Message.changeset(%Message{}, %{
topic: authorization_context.topic,
extension: :presence
})
]

{:ok, messages} = Repo.insert_all_entries(transaction_conn, messages, Message)

{[%{id: broadcast_id}], [%{id: presence_id}]} =
Enum.split_with(messages, &(&1.extension == :broadcast))

set_conn_config(transaction_conn, authorization_context)

policies =
get_read_policy_for_connection_and_extension(
transaction_conn,
authorization_context,
broadcast_id,
presence_id,
policies
)

Postgrex.query!(transaction_conn, "ROLLBACK AND CHAIN", [])
policies
end,
telemetry: [:realtime, :tenants, :read_authorization_check]
)
end

import Ecto.Query
defp get_write_policies_for_connection(conn, authorization_context, policies) do
Database.transaction(
conn,
fn transaction_conn ->
set_conn_config(transaction_conn, authorization_context)

policies =
get_write_policy_for_connection_and_extension(
transaction_conn,
authorization_context,
policies
)

Postgrex.query!(transaction_conn, "ROLLBACK AND CHAIN", [])

policies
end,
telemetry: [:realtime, :tenants, :write_authorization_check]
)
end

defp get_read_policy_for_connection_and_extension(
conn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ defmodule Realtime.Tenants.Authorization.Policies.BroadcastPolicies do
"""
require Logger

defstruct read: false, write: false
defstruct read: nil, write: nil

@type t :: %__MODULE__{
read: boolean(),
write: boolean()
read: boolean() | nil,
write: boolean() | nil
}
end
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ defmodule Realtime.Tenants.Authorization.Policies.PresencePolicies do
"""
require Logger

defstruct read: false, write: false
defstruct read: nil, write: nil

@type t :: %__MODULE__{
read: boolean(),
write: boolean()
read: boolean() | nil,
write: boolean() | nil
}
end
2 changes: 1 addition & 1 deletion lib/realtime/tenants/batch_broadcast.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ defmodule Realtime.Tenants.BatchBroadcast do
defp permissions_for_message(auth_params, {:ok, db_conn}, topic) do
with auth_params = Map.put(auth_params, :topic, topic),
auth_params = Authorization.build_authorization_params(auth_params),
{:ok, policies} <- Authorization.get_authorizations(db_conn, auth_params) do
{:ok, policies} <- Authorization.get_write_authorizations(db_conn, db_conn, auth_params) do
policies
else
{:error, :not_found} -> nil
Expand Down
37 changes: 22 additions & 15 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ defmodule RealtimeWeb.RealtimeChannel do
|> assign_counter()
|> assign(:using_broadcast?, !!params["config"]["broadcast"])
|> assign(:check_authorization?, !!params["config"]["private"])
|> assign(:policies, nil)

start_db_rate_counter(tenant_id)

Expand All @@ -60,7 +61,8 @@ defmodule RealtimeWeb.RealtimeChannel do
:ok <- limit_max_users(socket.assigns),
{:ok, claims, confirm_token_ref, access_token, _} <- confirm_token(socket),
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, access_token, claims, socket) do
socket = assign_authorization_context(socket, sub_topic, access_token, claims),
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do
public? = !socket.assigns.check_authorization?
is_new_api = is_new_api(params)
tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, public?)
Expand Down Expand Up @@ -380,8 +382,8 @@ defmodule RealtimeWeb.RealtimeChannel do
socket = assign(socket, :access_token, refresh_token)

with {:ok, claims, confirm_token_ref, _, socket} <- confirm_token(socket),
{:ok, socket} <-
maybe_assign_policies(channel_name, db_conn, access_token, claims, socket) do
socket = assign_authorization_context(socket, channel_name, access_token, claims),
{:ok, socket} <- maybe_assign_policies(channel_name, db_conn, socket) do
Helpers.cancel_timer(pg_sub_ref)
pg_change_params = Enum.map(pg_change_params, &Map.put(&1, :claims, claims))

Expand Down Expand Up @@ -696,26 +698,31 @@ defmodule RealtimeWeb.RealtimeChannel do
end)
end

defp assign_authorization_context(socket, topic, access_token, claims) do
authorization_context =
Authorization.build_authorization_params(%{
topic: topic,
headers: Map.get(socket.assigns, :headers, []),
jwt: access_token,
claims: claims,
role: claims["role"]
})

assign(socket, :authorization_context, authorization_context)
end

defp maybe_assign_policies(
topic,
db_conn,
access_token,
claims,
%{assigns: %{check_authorization?: true}} = socket
)
when not is_nil(topic) do
%{using_broadcast?: using_broadcast?} = socket.assigns

authorization_context =
Authorization.build_authorization_params(%{
topic: topic,
headers: socket.assigns.headers,
jwt: access_token,
claims: claims,
role: claims["role"]
})
authorization_context = socket.assigns.authorization_context

with {:ok, socket} <- Authorization.get_authorizations(socket, db_conn, authorization_context) do
with {:ok, socket} <-
Authorization.get_read_authorizations(socket, db_conn, authorization_context) do
cond do
match?(%Policies{broadcast: %BroadcastPolicies{read: false}}, socket.assigns.policies) ->
{:error, :unauthorized,
Expand All @@ -736,7 +743,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

defp maybe_assign_policies(_, _, _, _, socket) do
defp maybe_assign_policies(_, _, socket) do
{:ok, assign(socket, policies: nil)}
end

Expand Down
Loading

0 comments on commit eca8a57

Please sign in to comment.