Skip to content

Commit 5d088f7

Browse files
authored
Proxy info messages to the adapter (#316)
1 parent c6dcdf9 commit 5d088f7

File tree

5 files changed

+137
-6
lines changed

5 files changed

+137
-6
lines changed

.formatter.exs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
# Used by "mix format"
22
[
3-
inputs: ["{mix,.formatter}.exs", "{config,lib,test,integration_test}/**/*.{ex,exs}"]
3+
inputs: ["{mix,.formatter}.exs", "{config,lib,test,examples,integration_test}/**/*.{ex,exs}"]
44
]

examples/tcp_connection/lib/tcp_connection.ex

+17
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ defmodule TCPConnection do
6464

6565
case :gen_tcp.connect(host, port, socket_opts, timeout) do
6666
{:ok, sock} ->
67+
# Monitor the socket so we can react to it being closed. See handle_info/2.
68+
_ref = :inet.monitor(sock)
6769
{:ok, {sock, <<>>}}
6870

6971
{:error, reason} ->
@@ -143,6 +145,21 @@ defmodule TCPConnection do
143145
end
144146
end
145147

148+
# The handle_info callback is optional and can be removed if not needed.
149+
# Here it is used to react to `:inet.monitor/1` messages which arrive
150+
# when socket gets closed while the connection is idle.
151+
def handle_info({:DOWN, _ref, _type, sock, _info}, {sock, _buffer}) do
152+
{:disconnect, TCPConnection.Error.exception({:idle, :closed})}
153+
end
154+
155+
def handle_info(msg, state) do
156+
Logger.info(fn ->
157+
["#{__MODULE__} (", inspect(self()), ") missed message: ", inspect(msg)]
158+
end)
159+
160+
:ok
161+
end
162+
146163
@impl true
147164
def handle_close(_, _, s) do
148165
{:ok, nil, s}

integration_test/cases/info_test.exs

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
defmodule InfoTest do
2+
use ExUnit.Case, async: true
3+
4+
alias TestPool, as: P
5+
alias TestAgent, as: A
6+
alias TestQuery, as: Q
7+
8+
test "handle_info handles harmless message and moves on" do
9+
stack = [
10+
fn opts ->
11+
send(opts[:parent], {:connected, self()})
12+
{:ok, :state}
13+
end,
14+
:ok,
15+
{:idle, :state},
16+
{:idle, :state}
17+
]
18+
19+
{:ok, agent} = A.start_link(stack)
20+
{:ok, pool} = P.start_link(agent: agent, parent: self())
21+
22+
assert_receive {:connected, conn}
23+
send(conn, "some harmless message")
24+
assert P.run(pool, fn _ -> :result end) == :result
25+
26+
assert [
27+
connect: _,
28+
handle_info: _,
29+
handle_status: _,
30+
handle_status: _
31+
] = A.record(agent)
32+
end
33+
34+
test "handle_info can force disconnect" do
35+
stack = [
36+
fn opts ->
37+
send(opts[:parent], {:connected, self()})
38+
{:ok, :state}
39+
end,
40+
{:disconnect, RuntimeError.exception("TCP connection just closed")},
41+
:ok,
42+
fn opts ->
43+
send(opts[:parent], :reconnected)
44+
{:ok, :state}
45+
end
46+
]
47+
48+
{:ok, agent} = A.start_link(stack)
49+
P.start_link(agent: agent, parent: self())
50+
51+
assert_receive {:connected, conn}
52+
send(conn, "monitor says TCP connection just closed")
53+
assert_receive :reconnected
54+
55+
assert [
56+
connect: _,
57+
handle_info: _,
58+
disconnect: _,
59+
connect: _
60+
] = A.record(agent)
61+
end
62+
63+
test "handle_info's disconnect while checked out client crashes is no-op" do
64+
stack = [
65+
fn _opts ->
66+
{:ok, %{conn_pid: self()}}
67+
end,
68+
fn _query, _params, _opts, %{conn_pid: conn_pid} ->
69+
send(conn_pid, "monitor says TCP connection just closed")
70+
71+
# This waits for the info message to be processed.
72+
:sys.get_state(conn_pid)
73+
74+
{:disconnect, RuntimeError.exception("TCP connection is closed"), :new_state}
75+
end,
76+
{:disconnect, RuntimeError.exception("TCP connection just closed")},
77+
:ok,
78+
fn opts ->
79+
send(opts[:parent], :reconnected)
80+
{:ok, :state}
81+
end
82+
]
83+
84+
{:ok, agent} = A.start_link(stack)
85+
{:ok, pool} = P.start_link(agent: agent, parent: self())
86+
87+
assert {:error, %RuntimeError{message: "TCP connection is closed"}} =
88+
P.execute(pool, %Q{}, [])
89+
90+
assert_receive :reconnected
91+
92+
assert [
93+
connect: _,
94+
handle_execute: _,
95+
handle_info: _,
96+
disconnect: _,
97+
connect: _
98+
] = A.record(agent)
99+
end
100+
end

lib/db_connection/connection.ex

+15-5
Original file line numberDiff line numberDiff line change
@@ -330,12 +330,22 @@ defmodule DBConnection.Connection do
330330
handle_timeout(s)
331331
end
332332

333-
def handle_event(:info, msg, :no_state, %{mod: mod} = s) do
334-
Logger.info(fn ->
335-
[inspect(mod), ?\s, ?(, inspect(self()), ") missed message: " | inspect(msg)]
336-
end)
333+
def handle_event(:info, msg, :no_state, %{mod: mod, state: state} = s) do
334+
if function_exported?(mod, :handle_info, 2) do
335+
case apply(mod, :handle_info, [msg, state]) do
336+
:ok ->
337+
handle_timeout(s)
338+
339+
{:disconnect, err} ->
340+
{:keep_state, s, {:next_event, :internal, {:disconnect, {:log, err}}}}
341+
end
342+
else
343+
Logger.info(fn ->
344+
[inspect(mod), ?\s, ?(, inspect(self()), ") missed message: " | inspect(msg)]
345+
end)
337346

338-
handle_timeout(s)
347+
handle_timeout(s)
348+
end
339349
end
340350

341351
@doc false

test/test_support.exs

+4
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ defmodule TestConnection do
150150
TestAgent.eval(:handle_deallocate, [query, cursor, opts, state])
151151
end
152152

153+
def handle_info(message, state) do
154+
TestAgent.eval(:handle_info, [message, state])
155+
end
156+
153157
defp put_agent_from_opts(opts) do
154158
Process.get(:agent) || Process.put(:agent, agent_from_opts(opts))
155159
end

0 commit comments

Comments
 (0)