Skip to content

Commit

Permalink
remove Mint timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslandoga committed Feb 7, 2025
1 parent 1275855 commit 4dac9e7
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 30 deletions.
4 changes: 1 addition & 3 deletions lib/ch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Ch do
| {:scheme, String.t()}
| {:hostname, String.t()}
| {:port, :inet.port_number()}
| {:transport_opts, [:gen_tcp.connect_option() | :ssl.client_option()]}
| {:transport_opts, [:gen_tcp.connect_option() | :ssl.tls_client_option()]}
| DBConnection.start_option()

@doc """
Expand All @@ -30,7 +30,6 @@ defmodule Ch do
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of ClickHouse settings
* `:timeout` - HTTP receive timeout in milliseconds
* `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
* [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)
Expand Down Expand Up @@ -72,7 +71,6 @@ defmodule Ch do
* `:username` - Username
* `:password` - User password
* `:settings` - Keyword list of settings
* `:timeout` - Query request timeout
* `:command` - Command tag for the query
* `:headers` - Custom HTTP headers for the request
* `:format` - Custom response format for the request
Expand Down
46 changes: 21 additions & 25 deletions lib/ch/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Ch.Connection do
conn = maybe_reconnect(conn)
headers = [{"user-agent", @user_agent}]

case request(conn, "GET", "/ping", headers, _body = "", _opts = []) do
case request(conn, "GET", "/ping", headers, _body = []) do
{:ok, conn, _response} -> {:ok, conn}
{:error, error, conn} -> {:disconnect, error, conn}
{:disconnect, _error, _conn} = disconnect -> disconnect
Expand Down Expand Up @@ -88,30 +88,30 @@ defmodule Ch.Connection do
headers = headers(conn, extra_headers, opts)

with {:ok, conn, _ref} <- send_request(conn, "POST", path, headers, body),
{:ok, conn} <- eat_ok_status_and_headers(conn, timeout(conn, opts)) do
{:ok, conn} <- eat_ok_status_and_headers(conn) do
{:ok, query, %Result{command: command}, conn}
end
end

@spec eat_ok_status_and_headers(conn, timeout) ::
@spec eat_ok_status_and_headers(conn) ::
{:ok, %{conn: conn, buffer: [Mint.Types.response()]}}
| {:error, Ch.Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
defp eat_ok_status_and_headers(conn, timeout) do
case HTTP.recv(conn, 0, timeout) do
defp eat_ok_status_and_headers(conn) do
case HTTP.recv(conn, 0) do
{:ok, conn, responses} ->
case eat_ok_status_and_headers(responses) do
{:ok, data} ->
{:ok, %{conn: conn, buffer: data}}

:more ->
eat_ok_status_and_headers(conn, timeout)
eat_ok_status_and_headers(conn)

:error ->
all_responses_result =
case handle_all_responses(responses, []) do
{:ok, responses} -> {:ok, conn, responses}
{:more, acc} -> recv_all(conn, acc, timeout)
{:more, acc} -> recv_all(conn, acc)
end

with {:ok, conn, responses} <- all_responses_result do
Expand Down Expand Up @@ -148,8 +148,8 @@ defmodule Ch.Connection do
end
end

def handle_fetch(_query, result, opts, conn) do
case HTTP.recv(conn, 0, timeout(conn, opts)) do
def handle_fetch(_query, result, _opts, conn) do
case HTTP.recv(conn, 0) do
{:ok, conn, responses} ->
{halt_or_cont(responses), %Result{result | data: extract_data(responses)}, conn}

Expand Down Expand Up @@ -199,7 +199,7 @@ defmodule Ch.Connection do
{:ok, conn} ->
case body do
:eof ->
with {:ok, conn, responses} <- receive_full_response(conn, timeout(conn, opts)) do
with {:ok, conn, responses} <- receive_full_response(conn) do
{:ok, query, responses, conn}
end

Expand All @@ -219,7 +219,7 @@ defmodule Ch.Connection do
path = path(conn, query_params, opts)
headers = headers(conn, extra_headers, opts)

with {:ok, conn, responses} <- request(conn, "POST", path, headers, body, opts) do
with {:ok, conn, responses} <- request(conn, "POST", path, headers, body) do
{:ok, query, responses, conn}
end
end
Expand All @@ -232,13 +232,13 @@ defmodule Ch.Connection do

@typep response :: Mint.Types.status() | Mint.Types.headers() | binary

@spec request(conn, binary, binary, Mint.Types.headers(), iodata, [Ch.query_option()]) ::
@spec request(conn, binary, binary, Mint.Types.headers(), iodata) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
defp request(conn, method, path, headers, body, opts) do
defp request(conn, method, path, headers, body) do
with {:ok, conn, _ref} <- send_request(conn, method, path, headers, body) do
receive_full_response(conn, timeout(conn, opts))
receive_full_response(conn)
end
end

Expand All @@ -251,12 +251,12 @@ defmodule Ch.Connection do
end
end

@spec receive_full_response(conn, timeout) ::
@spec receive_full_response(conn) ::
{:ok, conn, [response]}
| {:error, Error.t(), conn}
| {:disconnect, Mint.Types.error(), conn}
defp receive_full_response(conn, timeout) do
with {:ok, conn, responses} <- recv_all(conn, [], timeout) do
defp receive_full_response(conn) do
with {:ok, conn, responses} <- recv_all(conn, []) do
case responses do
[200, headers | _rest] ->
conn = ensure_same_server(conn, headers)
Expand All @@ -275,14 +275,14 @@ defmodule Ch.Connection do
end
end

@spec recv_all(conn, [response], timeout()) ::
@spec recv_all(conn, [response]) ::
{:ok, conn, [response]} | {:disconnect, Mint.Types.error(), conn}
defp recv_all(conn, acc, timeout) do
case HTTP.recv(conn, 0, timeout) do
defp recv_all(conn, acc) do
case HTTP.recv(conn, 0, :infinity) do
{:ok, conn, responses} ->
case handle_all_responses(responses, acc) do
{:ok, responses} -> {:ok, conn, responses}
{:more, acc} -> recv_all(conn, acc, timeout)
{:more, acc} -> recv_all(conn, acc)
end

{:error, conn, reason, _responses} ->
Expand All @@ -302,9 +302,6 @@ defmodule Ch.Connection do
defp maybe_put_private(conn, _k, nil), do: conn
defp maybe_put_private(conn, k, v), do: HTTP.put_private(conn, k, v)

defp timeout(conn), do: HTTP.get_private(conn, :timeout)
defp timeout(conn, opts), do: Keyword.get(opts, :timeout) || timeout(conn)

defp settings(conn, opts) do
default_settings = HTTP.get_private(conn, :settings, [])
opts_settings = Keyword.get(opts, :settings, [])
Expand Down Expand Up @@ -375,7 +372,6 @@ defmodule Ch.Connection do
with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do
conn =
conn
|> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
|> maybe_put_private(:database, opts[:database])
|> maybe_put_private(:username, opts[:username])
|> maybe_put_private(:password, opts[:password])
Expand Down
2 changes: 0 additions & 2 deletions test/ch/connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1247,8 +1247,6 @@ defmodule Ch.ConnectionTest do
end

describe "options" do
# this test is flaky, sometimes it raises due to ownership timeout
@tag capture_log: true, skip: true
test "can provide custom timeout", %{conn: conn} do
assert {:error, %Mint.TransportError{reason: :timeout} = error} =
Ch.query(conn, "select sleep(1)", _params = [], timeout: 100)
Expand Down

0 comments on commit 4dac9e7

Please sign in to comment.