Skip to content

Commit f0bbc4b

Browse files
authored
Add a connection listener that emits telemetry events (#311)
* Add a connection listener that emits telemetry events * Add documentation * Address review comment * Add usage instructions * Fix wrong module name * Documentation improvements
1 parent 700cbf6 commit f0bbc4b

File tree

3 files changed

+229
-0
lines changed

3 files changed

+229
-0
lines changed

integration_test/cases/connection_listeners_test.exs

+112
Original file line numberDiff line numberDiff line change
@@ -203,4 +203,116 @@ defmodule ConnectionListenersTest do
203203
assert is_pid(conn3)
204204
refute conn1 == conn2 == conn3
205205
end
206+
207+
describe "telemetry listener" do
208+
test "emits events with no tag" do
209+
attach_telemetry_forwarding_handler()
210+
err = RuntimeError.exception("oops")
211+
212+
stack = [
213+
{:ok, :state},
214+
{:disconnect, err, :discon},
215+
:ok,
216+
{:error, err}
217+
]
218+
219+
{:ok, agent} = A.start_link(stack)
220+
{:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link()
221+
222+
{:ok, pool} =
223+
P.start_link(
224+
agent: agent,
225+
parent: self(),
226+
connection_listeners: [telemetry_listener],
227+
backoff_min: 1_000
228+
)
229+
230+
assert_receive {:telemetry, :connected, %{tag: nil}}
231+
assert P.close(pool, %Q{})
232+
assert_receive {:telemetry, :disconnected, %{tag: nil}}
233+
after
234+
detach_telemetry_forwarding_handler()
235+
end
236+
237+
test "emits events with tag" do
238+
attach_telemetry_forwarding_handler()
239+
err = RuntimeError.exception("oops")
240+
241+
stack = [
242+
{:ok, :state},
243+
{:disconnect, err, :discon},
244+
:ok,
245+
{:error, err}
246+
]
247+
248+
{:ok, agent} = A.start_link(stack)
249+
{:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link()
250+
251+
tag = make_ref()
252+
253+
{:ok, pool} =
254+
P.start_link(
255+
agent: agent,
256+
parent: self(),
257+
connection_listeners: {[telemetry_listener], tag},
258+
backoff_min: 1_000
259+
)
260+
261+
assert_receive {:telemetry, :connected, %{tag: ^tag}}
262+
assert P.close(pool, %Q{})
263+
assert_receive {:telemetry, :disconnected, %{tag: ^tag}}
264+
after
265+
detach_telemetry_forwarding_handler()
266+
end
267+
268+
test "handles non-graceful disconnects" do
269+
attach_telemetry_forwarding_handler()
270+
271+
stack = [
272+
fn opts ->
273+
send(opts[:parent], {:hi, self()})
274+
{:ok, :state}
275+
end,
276+
{:ok, :state}
277+
]
278+
279+
{:ok, agent} = A.start_link(stack)
280+
{:ok, telemetry_listener} = DBConnection.TelemetryListener.start_link()
281+
282+
{:ok, _pool} =
283+
P.start_link(
284+
agent: agent,
285+
parent: self(),
286+
connection_listeners: [telemetry_listener],
287+
backoff_min: 1_000
288+
)
289+
290+
assert_receive {:hi, pid}
291+
Process.exit(pid, :kill)
292+
293+
assert_receive {:telemetry, :disconnected, %{pid: ^pid}}
294+
after
295+
detach_telemetry_forwarding_handler()
296+
end
297+
end
298+
299+
defp attach_telemetry_forwarding_handler() do
300+
test_pid = self()
301+
302+
:telemetry.attach_many(
303+
"TestHandler",
304+
[
305+
[:db_connection, :connected],
306+
[:db_connection, :disconnected]
307+
],
308+
fn [:db_connection, action], _, metadata, _ ->
309+
send(test_pid, {:telemetry, action, metadata})
310+
end,
311+
%{}
312+
)
313+
end
314+
315+
defp detach_telemetry_forwarding_handler() do
316+
:telemetry.detach("TestHandler")
317+
end
206318
end

lib/db_connection.ex

+8
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,12 @@ defmodule DBConnection do
524524
This feature is available since v2.6.0. Before this version `:connection_listeners` only
525525
accepted a list of listener processes.
526526
527+
## Telemetry listener
528+
529+
DBConnection provides a connection listener that emits telemetry events upon
530+
connection and disconnection, see the `DBConnection.TelemetryListener` module
531+
for more info.
532+
527533
## Connection Configuration Callback
528534
529535
The `:configure` function will be called before each individual connection to the
@@ -560,6 +566,8 @@ defmodule DBConnection do
560566
561567
* `:opts` - All options given to the pool operation
562568
569+
See `DBConnection.TelemetryListener` for enabling `[:db_connection, :connected]`
570+
and `[:db_connection, :disconnected]` events.
563571
"""
564572
@spec start_link(module, [start_option()] | Keyword.t()) :: GenServer.on_start()
565573
def start_link(conn_mod, opts) do
+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
defmodule DBConnection.TelemetryListener do
2+
@moduledoc """
3+
A connection listener that emits telemetry events for connection and disconnection
4+
5+
It monitors connection processes and ensures that disconnection events are
6+
always emitted.
7+
8+
## Usage
9+
10+
Start the listener, and pass it under the `:connection_listeners` option when
11+
starting DBConnection:
12+
13+
{:ok, pid} = TelemetryListener.start_link()
14+
{:ok, _conn} = DBConnection.start_link(SomeModule, connection_listeners: [pid])
15+
16+
# Using a tag, which will be sent in telemetry metadata
17+
{:ok, _conn} = DBConnection.start_link(SomeModule, connection_listeners: {[pid], :my_tag})
18+
19+
# Or, with a Supervisor:
20+
Supervisor.start_link([
21+
{TelemetryListener, [name: MyListener]},
22+
DBConnection.child_spec(SomeModule, connection_listeners: {[MyListener], :my_tag})
23+
])
24+
25+
26+
## Telemetry events
27+
28+
### Connected
29+
30+
`[:db_connection, :connected]` - Executed after a connection is established.
31+
32+
#### Measurements
33+
34+
* `:count` - Always 1
35+
36+
#### Metadata
37+
38+
* `:pid` - The connection pid
39+
* `:tag` - The connection pool tag
40+
41+
### Disconnected
42+
43+
`[:db_connection, :disconnected]` - Executed after a disconnect.
44+
45+
#### Measurements
46+
47+
* `:count` - Always 1
48+
49+
#### Metadata
50+
51+
* `:pid` - The connection pid
52+
* `:tag` - The connection pool tag
53+
"""
54+
55+
use GenServer
56+
57+
@doc "Starts a telemetry listener"
58+
@spec start_link(GenServer.options()) :: {:ok, pid()}
59+
def start_link(opts \\ []) do
60+
GenServer.start_link(__MODULE__, nil, opts)
61+
end
62+
63+
@impl GenServer
64+
def init(nil) do
65+
{:ok, %{monitoring: %{}}}
66+
end
67+
68+
@impl GenServer
69+
def handle_info({:connected, pid, tag}, state) do
70+
handle_connected(pid, tag, state)
71+
end
72+
73+
def handle_info({:connected, pid}, state) do
74+
handle_connected(pid, nil, state)
75+
end
76+
77+
def handle_info({:disconnected, pid, _}, state) do
78+
handle_disconnected(pid, state)
79+
end
80+
81+
def handle_info({:disconnected, pid}, state) do
82+
handle_disconnected(pid, state)
83+
end
84+
85+
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
86+
handle_disconnected(pid, state)
87+
end
88+
89+
defp handle_connected(pid, tag, state) do
90+
:telemetry.execute([:db_connection, :connected], %{count: 1}, %{tag: tag, pid: pid})
91+
ref = Process.monitor(pid)
92+
93+
{:noreply, put_in(state.monitoring[pid], {ref, tag})}
94+
end
95+
96+
defp handle_disconnected(pid, state) do
97+
case state.monitoring[pid] do
98+
# Already handled. We may receive two messages: one from monitor and one
99+
# from listener. For this reason, we need to handle both.
100+
nil ->
101+
{:noreply, state}
102+
103+
{ref, tag} ->
104+
Process.demonitor(ref, [:flush])
105+
:telemetry.execute([:db_connection, :disconnected], %{count: 1}, %{tag: tag, pid: pid})
106+
{:noreply, %{state | monitoring: Map.delete(state.monitoring, pid)}}
107+
end
108+
end
109+
end

0 commit comments

Comments
 (0)