diff --git a/lib/ch.ex b/lib/ch.ex index a027102..073c55c 100644 --- a/lib/ch.ex +++ b/lib/ch.ex @@ -2,47 +2,83 @@ defmodule Ch do @moduledoc "Minimal HTTP ClickHouse client." alias Ch.{Connection, Query, Result} + @type common_option :: + {:database, String.t()} + | {:username, String.t()} + | {:password, String.t()} + | {:settings, Keyword.t()} + | {:timeout, timeout} + + @type start_option :: + common_option + | {:scheme, String.t()} + | {:hostname, String.t()} + | {:port, :inet.port_number()} + | {:transport_opts, :gen_tcp.connect_option()} + | DBConnection.start_option() + @doc """ Start the connection process and connect to ClickHouse. ## Options + * `:scheme` - HTTP scheme, defaults to `"http"` * `:hostname` - server hostname, defaults to `"localhost"` * `:port` - HTTP port, defualts to `8123` - * `:scheme` - HTTP scheme, defaults to `"http"` + * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info * `:database` - Database, defaults to `"default"` * `:username` - Username * `:password` - User password * `:settings` - Keyword list of ClickHouse settings * `:timeout` - HTTP receive timeout in milliseconds * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info + * [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0) """ + @spec start_link([start_option]) :: GenServer.on_start() def start_link(opts \\ []) do DBConnection.start_link(Connection, opts) end @doc """ Returns a supervisor child specification for a DBConnection pool. + + See `start_link/1` for supported options. """ + @spec child_spec([start_option]) :: :supervisor.child_spec() def child_spec(opts) do DBConnection.child_spec(Connection, opts) end + @type query_option :: + common_option + | {:command, Ch.Query.command()} + | {:headers, [{String.t(), String.t()}]} + | {:format, String.t()} + # TODO remove + | {:encode, boolean} + | {:decode, boolean} + | DBConnection.connection_option() + @doc """ Runs a query and returns the result as `{:ok, %Ch.Result{}}` or `{:error, Exception.t()}` if there was a database error. ## Options - * `:timeout` - Query request timeout - * `:settings` - Keyword list of settings * `:database` - Database * `:username` - Username * `:password` - User password + * `:settings` - Keyword list of settings + * `:timeout` - Query request timeout + * `:command` - Command tag for the query + * `:headers` - Custom HTTP headers for the request + * `:format` - Custom response format for the request + * `:decode` - Whether to automatically decode the response + * [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0) """ - @spec query(DBConnection.conn(), iodata, params, Keyword.t()) :: + @spec query(DBConnection.conn(), iodata, params, [query_option]) :: {:ok, Result.t()} | {:error, Exception.t()} when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() def query(conn, statement, params \\ [], opts \\ []) do @@ -57,7 +93,7 @@ defmodule Ch do Runs a query and returns the result or raises `Ch.Error` if there was an error. See `query/4`. """ - @spec query!(DBConnection.conn(), iodata, params, Keyword.t()) :: Result.t() + @spec query!(DBConnection.conn(), iodata, params, [query_option]) :: Result.t() when params: map | [term] | [row :: [term]] | iodata | Enumerable.t() def query!(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) @@ -65,7 +101,7 @@ defmodule Ch do end @doc false - @spec stream(DBConnection.t(), iodata, map | [term], Keyword.t()) :: DBConnection.Stream.t() + @spec stream(DBConnection.t(), iodata, map | [term], [query_option]) :: DBConnection.Stream.t() def stream(conn, statement, params \\ [], opts \\ []) do query = Query.build(statement, opts) DBConnection.stream(conn, query, params, opts) diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex index 70fcaf7..925de87 100644 --- a/lib/ch/connection.ex +++ b/lib/ch/connection.ex @@ -10,7 +10,7 @@ defmodule Ch.Connection do @typep conn :: HTTP.t() @impl true - @spec connect(Keyword.t()) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} + @spec connect([Ch.start_option()]) :: {:ok, conn} | {:error, Error.t() | Mint.Types.error()} def connect(opts) do scheme = String.to_existing_atom(opts[:scheme] || "http") address = opts[:hostname] || "localhost" @@ -171,7 +171,7 @@ defmodule Ch.Connection do @typep response :: Mint.Types.status() | Mint.Types.headers() | binary - @spec request(conn, binary, binary, Mint.Types.headers(), iodata, Keyword.t()) :: + @spec request(conn, binary, binary, Mint.Types.headers(), iodata, [Ch.query_option()]) :: {:ok, conn, [response]} | {:error, Error.t(), conn} | {:disconnect, Mint.Types.error(), conn} diff --git a/lib/ch/query.ex b/lib/ch/query.ex index 2e6888d..ee44b87 100644 --- a/lib/ch/query.ex +++ b/lib/ch/query.ex @@ -2,10 +2,10 @@ defmodule Ch.Query do @moduledoc "Query struct wrapping the SQL statement." defstruct [:statement, :command, :encode, :decode] - @type t :: %__MODULE__{statement: iodata, command: atom, encode: boolean, decode: boolean} + @type t :: %__MODULE__{statement: iodata, command: command, encode: boolean, decode: boolean} @doc false - @spec build(iodata, Keyword.t()) :: t + @spec build(iodata, [Ch.query_option()]) :: t def build(statement, opts \\ []) do command = Keyword.get(opts, :command) || extract_command(statement) encode = Keyword.get(opts, :encode, true) @@ -43,6 +43,13 @@ defmodule Ch.Query do {"WATCH", :watch} ] + command_union = + statements + |> Enum.map(fn {_, command} -> command end) + |> Enum.reduce(&{:|, [], [&1, &2]}) + + @type command :: unquote(command_union) + defp extract_command(statement) for {statement, command} <- statements do @@ -64,13 +71,14 @@ end defimpl DBConnection.Query, for: Ch.Query do alias Ch.{Query, Result, RowBinary} - @spec parse(Query.t(), Keyword.t()) :: Query.t() + @spec parse(Query.t(), [Ch.query_option()]) :: Query.t() def parse(query, _opts), do: query - @spec describe(Query.t(), Keyword.t()) :: Query.t() + @spec describe(Query.t(), [Ch.query_option()]) :: Query.t() def describe(query, _opts), do: query - @spec encode(Query.t(), params, Keyword.t()) :: {query_params, Mint.Types.headers(), body} + @spec encode(Query.t(), params, [Ch.query_option()]) :: + {query_params, Mint.Types.headers(), body} when params: map | [term] | [row :: [term]] | iodata | Enumerable.t(), query_params: [{String.t(), String.t()}], body: iodata | Enumerable.t() @@ -120,7 +128,7 @@ defimpl DBConnection.Query, for: Ch.Query do |> format_row_binary?() end - @spec decode(Query.t(), [response], Keyword.t()) :: Result.t() + @spec decode(Query.t(), [response], [Ch.query_option()]) :: Result.t() when response: Mint.Types.status() | Mint.Types.headers() | binary def decode(%Query{command: :insert}, responses, _opts) do [_status, headers | _data] = responses diff --git a/lib/ch/result.ex b/lib/ch/result.ex index ddca4df..848444d 100644 --- a/lib/ch/result.ex +++ b/lib/ch/result.ex @@ -14,7 +14,7 @@ defmodule Ch.Result do defstruct [:command, :num_rows, :rows, :headers] @type t :: %__MODULE__{ - command: atom, + command: Ch.Query.command(), num_rows: non_neg_integer | nil, rows: [[term]] | iodata | nil, headers: Mint.Types.headers()