From 5968aefadd6aca25751ddbadb7af7f89db6770f1 Mon Sep 17 00:00:00 2001 From: Tom Taylor <tom@tomtaylor.co.uk> Date: Mon, 22 Apr 2024 12:42:48 +0100 Subject: [PATCH] Support pluggable JSON libraries It's handy to be able to drop in a faster JSON library, such as Jsonrs, for use cases with very large requests and responses, as JSON encoding and decoding time is greatly reduced. --- lib/snap.ex | 3 + lib/snap/bulk/action.ex | 148 +++++++++++++++++++++---------------- lib/snap/bulk/actions.ex | 53 ++++--------- lib/snap/bulk/bulk.ex | 9 ++- lib/snap/cluster.ex | 10 ++- lib/snap/multi/multi.ex | 19 ++--- lib/snap/request.ex | 28 ++++--- test/bulk/actions_test.exs | 26 +++++-- test/bulk/bulk_test.exs | 48 ++++++------ test/indexes_test.exs | 4 +- test/multi/multi_test.exs | 2 +- test/search_test.exs | 4 +- 12 files changed, 193 insertions(+), 161 deletions(-) diff --git a/lib/snap.ex b/lib/snap.ex index 3ac417a..1062437 100644 --- a/lib/snap.ex +++ b/lib/snap.ex @@ -87,6 +87,9 @@ defmodule Snap do `[:my_app, :snap]`) * `index_namespace` - see `Snap.Cluster.Namespace` for details (defaults to `nil`) + * `json_library` - the library used for encoding/decoding JSON (defaults to + `Jason`. You may wish to switch this to [`Jsonrs`](https://hex.pm/packages/jsonrs) + for better performance encoding and decoding large requests and responses) ## Telemetry diff --git a/lib/snap/bulk/action.ex b/lib/snap/bulk/action.ex index eeddb63..49b4019 100644 --- a/lib/snap/bulk/action.ex +++ b/lib/snap/bulk/action.ex @@ -1,123 +1,143 @@ +defmodule Snap.Bulk.Action do + @moduledoc false + @callback to_action_json(struct()) :: map() + @callback to_document_json(struct()) :: map() | nil +end + defmodule Snap.Bulk.Action.Create do @moduledoc """ Represents a create step in a `Snap.Bulk` operation """ + @behaviour Snap.Bulk.Action + @enforce_keys [:doc] - defstruct [:_index, :_id, :require_alias, :doc] + defstruct [:index, :id, :require_alias, :doc] @type t :: %__MODULE__{ - _index: String.t() | nil, - _id: String.t() | nil, + index: String.t() | nil, + id: String.t() | nil, require_alias: boolean() | nil, doc: map() } + + @doc false + def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do + values = %{_index: index, _id: id, require_alias: require_alias} + + values + |> Enum.reject(&is_nil(elem(&1, 1))) + |> Enum.into(%{}) + |> then(fn values -> %{"create" => values} end) + end + + @doc false + def to_document_json(%__MODULE__{doc: doc}) do + doc + end end defmodule Snap.Bulk.Action.Delete do @moduledoc """ Represents a delete step in a `Snap.Bulk` operation """ - @enforce_keys [:_id] - defstruct [:_index, :_id, :require_alias] + @behaviour Snap.Bulk.Action + + @enforce_keys [:id] + defstruct [:index, :id, :require_alias] @type t :: %__MODULE__{ - _index: String.t() | nil, - _id: String.t(), + index: String.t() | nil, + id: String.t(), require_alias: boolean() | nil } + + @doc false + def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do + values = %{_index: index, _id: id, require_alias: require_alias} + + values + |> Enum.reject(&is_nil(elem(&1, 1))) + |> Enum.into(%{}) + |> then(fn values -> %{"delete" => values} end) + end + + @doc false + def to_document_json(_), do: nil end defmodule Snap.Bulk.Action.Index do @moduledoc """ Represents an index step in a `Snap.Bulk` operation """ + @behaviour Snap.Bulk.Action + @enforce_keys [:doc] - defstruct [:_index, :_id, :require_alias, :doc] + defstruct [:index, :id, :require_alias, :doc] @type t :: %__MODULE__{ - _index: String.t() | nil, - _id: String.t() | nil, + index: String.t() | nil, + id: String.t() | nil, require_alias: boolean() | nil, doc: map() } + + @doc false + def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do + values = %{_index: index, _id: id, require_alias: require_alias} + + values + |> Enum.reject(&is_nil(elem(&1, 1))) + |> Enum.into(%{}) + |> then(fn values -> %{"index" => values} end) + end + + @doc false + def to_document_json(%__MODULE__{doc: doc}) do + doc + end end defmodule Snap.Bulk.Action.Update do @moduledoc """ Represents an update step in a `Snap.Bulk` operation """ + @behaviour Snap.Bulk.Action + @enforce_keys [:doc] defstruct [ - :_id, - :_index, - :_source, + :id, + :index, + :require_alias, :doc, :doc_as_upsert, - :require_alias, - :script, - :upsert + :script ] @type t :: %__MODULE__{ - _id: String.t() | nil, - _index: String.t() | nil, - _source: boolean() | nil, + id: String.t() | nil, + index: String.t() | nil, + require_alias: boolean() | nil, doc: map(), doc_as_upsert: boolean() | nil, - require_alias: boolean() | nil, - script: map() | nil, - upsert: map() | nil + script: map() | nil } -end - -defimpl Jason.Encoder, for: Snap.Bulk.Action.Create do - require Jason.Helpers - - def encode(%Snap.Bulk.Action.Create{_index: index, _id: id, require_alias: require_alias}, opts) do - values = [_index: index, _id: id, require_alias: require_alias] - - values - |> Enum.reject(&is_nil(elem(&1, 1))) - |> then(fn values -> %{"create" => Jason.OrderedObject.new(values)} end) - |> Jason.Encode.map(opts) - end -end - -defimpl Jason.Encoder, for: Snap.Bulk.Action.Delete do - require Jason.Helpers - - def encode(%Snap.Bulk.Action.Delete{_index: index, _id: id, require_alias: require_alias}, opts) do - values = [_index: index, _id: id, require_alias: require_alias] - - values - |> Enum.reject(&is_nil(elem(&1, 1))) - |> then(fn values -> %{"delete" => Jason.OrderedObject.new(values)} end) - |> Jason.Encode.map(opts) - end -end -defimpl Jason.Encoder, for: Snap.Bulk.Action.Update do - require Jason.Helpers - - def encode(%Snap.Bulk.Action.Update{_index: index, _id: id, require_alias: require_alias}, opts) do - values = [_index: index, _id: id, require_alias: require_alias] + @doc false + def to_action_json(%__MODULE__{index: index, id: id, require_alias: require_alias}) do + values = %{_index: index, _id: id, require_alias: require_alias} values |> Enum.reject(&is_nil(elem(&1, 1))) - |> then(fn values -> %{"update" => Jason.OrderedObject.new(values)} end) - |> Jason.Encode.map(opts) + |> Enum.into(%{}) + |> then(fn values -> %{"update" => values} end) end -end - -defimpl Jason.Encoder, for: Snap.Bulk.Action.Index do - require Jason.Helpers - def encode(%Snap.Bulk.Action.Index{_index: index, _id: id, require_alias: require_alias}, opts) do - values = [_index: index, _id: id, require_alias: require_alias] + @doc false + def to_document_json(%__MODULE__{doc: doc, doc_as_upsert: doc_as_upsert, script: script}) do + values = %{doc: doc, doc_as_upsert: doc_as_upsert, script: script} values |> Enum.reject(&is_nil(elem(&1, 1))) - |> then(fn values -> %{"index" => Jason.OrderedObject.new(values)} end) - |> Jason.Encode.map(opts) + |> Enum.into(%{}) end end diff --git a/lib/snap/bulk/actions.ex b/lib/snap/bulk/actions.ex index 227a326..8107b97 100644 --- a/lib/snap/bulk/actions.ex +++ b/lib/snap/bulk/actions.ex @@ -1,56 +1,35 @@ defmodule Snap.Bulk.Actions do @moduledoc false - alias Snap.Bulk.Action.{Create, Index, Update, Delete} - @doc """ Encodes a list of bulk action structs into line separated JSON for feeding to the /_bulk endpoint. """ - def encode(actions) do - encode_actions([], actions) + def encode(actions, json_library \\ Jason) do + encode_actions([], actions, json_library) end - defp encode_actions(iolist, []) do + defp encode_actions(iolist, [], _json_library) do iolist end - defp encode_actions(iolist, [head | tail]) do - updated_iolist = [iolist, encode_action(head)] - encode_actions(updated_iolist, tail) - end - - defp encode_action(%type{} = action) when type in [Create, Index] do - doc = action.doc - - doc_json = - doc - |> Jason.encode!() - - action_json = encode_action_command(action) - - [action_json, "\n", doc_json, "\n"] + defp encode_actions(iolist, [head | tail], json_library) do + updated_iolist = [iolist, encode_action(head, json_library)] + encode_actions(updated_iolist, tail, json_library) end - defp encode_action(%Delete{} = action) do - action_json = encode_action_command(action) - - [action_json, "\n"] - end - - defp encode_action(%Update{} = action) do - doc = action.doc - - doc_json = - %{doc: doc} - |> Jason.encode!() - - action_json = encode_action_command(action) + defp encode_action(%type{} = action, json_library) do + action_json = type.to_action_json(action) + doc_json = type.to_document_json(action) - [action_json, "\n", doc_json, "\n"] + if doc_json do + [encode_json(action_json, json_library), "\n", encode_json(doc_json, json_library), "\n"] + else + [encode_json(action_json, json_library), "\n"] + end end - defp encode_action_command(action) do - Jason.encode!(action) + defp encode_json(json, json_library) do + json_library.encode!(json) end end diff --git a/lib/snap/bulk/bulk.ex b/lib/snap/bulk/bulk.ex index f8d9269..7905d96 100644 --- a/lib/snap/bulk/bulk.ex +++ b/lib/snap/bulk/bulk.ex @@ -21,9 +21,9 @@ defmodule Snap.Bulk do ``` actions = [ - %Snap.Bulk.Action.Create{_id: 1, doc: %{foo: "bar"}}, - %Snap.Bulk.Action.Create{_id: 2, doc: %{foo: "bar"}}, - %Snap.Bulk.Action.Create{_id: 3, doc: %{foo: "bar"}} + %Snap.Bulk.Action.Create{id: 1, doc: %{foo: "bar"}}, + %Snap.Bulk.Action.Create{id: 2, doc: %{foo: "bar"}}, + %Snap.Bulk.Action.Create{id: 3, doc: %{foo: "bar"}} ] actions @@ -111,7 +111,8 @@ defmodule Snap.Bulk do end defp process_chunk(actions, cluster, index, params, request_opts, error_count, _max_errors) do - body = Actions.encode(actions) + json_library = cluster.json_library() + body = Actions.encode(actions, json_library) headers = [{"content-type", "application/x-ndjson"}] diff --git a/lib/snap/cluster.ex b/lib/snap/cluster.ex index c329e51..091030c 100644 --- a/lib/snap/cluster.ex +++ b/lib/snap/cluster.ex @@ -44,6 +44,13 @@ defmodule Snap.Cluster do Supervisor.config(__MODULE__) end + @doc """ + Returns the JSON library configured for the Cluster. + """ + def json_library() do + Keyword.get(config(), :json_library, Jason) + end + @doc """ Returns the otp_app that the Cluster was defined with. """ @@ -124,7 +131,8 @@ defmodule Snap.Cluster do index_namespace: String.t() | nil, telemetry_prefix: list(atom()), http_client_adapter: - Snap.HTTPClient.t() | {Snap.HTTPClient.t(), adapter_config :: Keyword.t()} + Snap.HTTPClient.t() | {Snap.HTTPClient.t(), adapter_config :: Keyword.t()}, + json_library: module() ] @doc """ diff --git a/lib/snap/multi/multi.ex b/lib/snap/multi/multi.ex index c9aed9e..d164298 100644 --- a/lib/snap/multi/multi.ex +++ b/lib/snap/multi/multi.ex @@ -66,7 +66,8 @@ defmodule Snap.Multi do {:ok, Snap.Multi.Response.t()} | {:error, Snap.Cluster.error()} def run(%__MODULE__{} = multi, cluster, index_or_alias, params \\ [], headers \\ [], opts \\ []) do ids = build_ids(multi.searches) - body = encode(multi) + json_library = cluster.json_library() + body = encode(multi, json_library) headers = headers ++ [{"content-type", "application/x-ndjson"}] namespaced_index = Namespace.add_namespace_to_index(index_or_alias, cluster) @@ -76,23 +77,23 @@ defmodule Snap.Multi do end end - defp encode(%__MODULE__{} = multi) do + defp encode(%__MODULE__{} = multi, json_library) do multi.searches - |> Enum.flat_map(&encode_search/1) + |> Enum.flat_map(&encode_search(&1, json_library)) end - defp encode_search(%Search{headers: headers, body: body}) do - [encode_headers(headers), "\n", encode_body(body), "\n"] + defp encode_search(%Search{headers: headers, body: body}, json_library) do + [encode_headers(headers, json_library), "\n", encode_body(body, json_library), "\n"] end - defp encode_headers(headers) do + defp encode_headers(headers, json_library) do headers |> Enum.into(%{}) - |> Jason.encode!(pretty: false) + |> json_library.encode!(pretty: false) end - defp encode_body(body) do - Jason.encode!(body, pretty: false) + defp encode_body(body, json_library) do + json_library.encode!(body, pretty: false) end defp build_ids(searches) do diff --git a/lib/snap/request.ex b/lib/snap/request.ex index c1ab4d4..82fb475 100644 --- a/lib/snap/request.ex +++ b/lib/snap/request.ex @@ -1,6 +1,10 @@ defmodule Snap.Request do - @moduledoc false + @moduledoc """ + Supports making arbitrary requests against `Snap.Cluster`. + In most cases you're better off using the functions in `Snap.Cluster` + directly, e.g. `c:Snap.Cluster.get/4`. + """ require Logger alias Snap.HTTPClient @@ -10,9 +14,13 @@ defmodule Snap.Request do {"accept", "application/json"} ] + @doc """ + Makes an HTTP request against a `Snap.Cluster` + """ def request(cluster, method, path, body \\ nil, params \\ [], headers \\ [], opts \\ []) do config = cluster.config() auth = Keyword.get(config, :auth, Snap.Auth.Plain) + json_library = cluster.json_library() url = config @@ -20,7 +28,7 @@ defmodule Snap.Request do |> URI.merge(path) |> append_query_params(params) - body = encode_body(body) + body = encode_body(body, json_library) headers = set_default_headers(headers) start_time = System.monotonic_time() @@ -30,7 +38,7 @@ defmodule Snap.Request do response_time = System.monotonic_time() - start_time - result = parse_response(response) + result = parse_response(response, json_library) decode_time = System.monotonic_time() - response_time - start_time total_time = response_time + decode_time @@ -63,15 +71,15 @@ defmodule Snap.Request do end end - defp parse_response(response) do + defp parse_response(response, json_library) do case response do {:ok, %HTTPClient.Response{body: data, status: status}} when status >= 200 and status < 300 -> - Jason.decode(data) + json_library.decode(data) {:ok, %HTTPClient.Response{body: data} = response} -> # If there's no valid JSON treat the error as an HTTPError. - case Jason.decode(data) do + case json_library.decode(data) do {:ok, json} -> exception = Snap.ResponseError.exception_from_json(json) {:error, exception} @@ -108,10 +116,10 @@ defmodule Snap.Request do } end - defp encode_body(body) when is_map(body), do: Jason.encode!(body) - defp encode_body(body), do: body + defp encode_body(body, json_library) when is_map(body), do: json_library.encode!(body) + defp encode_body(body, _json_library), do: body - def append_query_params(url, query_params \\ []) do + defp append_query_params(url, query_params) do uri = URI.parse(url) query_params_str = Map.get(uri, :query) || "" @@ -133,7 +141,7 @@ defmodule Snap.Request do |> URI.to_string() end - def set_default_headers(headers) do + defp set_default_headers(headers) do @default_headers |> Enum.reduce(headers, fn {key, _value} = tuple, acc -> if List.keymember?(acc, key, 0) do diff --git a/test/bulk/actions_test.exs b/test/bulk/actions_test.exs index 69ff8ab..2e93b81 100644 --- a/test/bulk/actions_test.exs +++ b/test/bulk/actions_test.exs @@ -6,18 +6,30 @@ defmodule Snap.Bulk.ActionsTest do alias Snap.Bulk.Actions test "encoding actions" do - doc = %{foo: "bar"} + doc = %{"foo" => "bar"} actions = [ - %Action.Index{_index: "foo", doc: doc}, - %Action.Create{_index: "foo", doc: doc, require_alias: true}, - %Action.Update{_index: "foo", doc: doc, _id: 2}, - %Action.Delete{_index: "foo", _id: 1} + %Action.Index{index: "foo", doc: doc}, + %Action.Create{index: "foo", doc: doc, require_alias: true}, + %Action.Update{index: "foo", doc: doc, id: 2}, + %Action.Delete{index: "foo", id: 1} ] encoded = Actions.encode(actions) |> IO.chardata_to_string() - assert encoded == - "{\"index\":{\"_index\":\"foo\"}}\n{\"foo\":\"bar\"}\n{\"create\":{\"_index\":\"foo\",\"require_alias\":true}}\n{\"foo\":\"bar\"}\n{\"update\":{\"_index\":\"foo\",\"_id\":2}}\n{\"doc\":{\"foo\":\"bar\"}}\n{\"delete\":{\"_index\":\"foo\",\"_id\":1}}\n" + lines = + encoded + |> String.split("\n", trim: true) + |> Enum.map(&Jason.decode!/1) + + assert lines == [ + %{"index" => %{"_index" => "foo"}}, + doc, + %{"create" => %{"_index" => "foo", "require_alias" => true}}, + doc, + %{"update" => %{"_index" => "foo", "_id" => 2}}, + %{"doc" => doc}, + %{"delete" => %{"_index" => "foo", "_id" => 1}} + ] end end diff --git a/test/bulk/bulk_test.exs b/test/bulk/bulk_test.exs index e88d697..af0bd97 100644 --- a/test/bulk/bulk_test.exs +++ b/test/bulk/bulk_test.exs @@ -16,10 +16,10 @@ defmodule Snap.BulkTest do doc = %{foo: "bar"} actions = [ - %Action.Index{doc: doc, _id: 1}, - %Action.Index{doc: doc, _id: 2}, - %Action.Delete{_id: 1}, - %Action.Delete{_id: 2} + %Action.Index{doc: doc, id: 1}, + %Action.Index{doc: doc, id: 2}, + %Action.Delete{id: 1}, + %Action.Delete{id: 2} ] result = @@ -35,10 +35,10 @@ defmodule Snap.BulkTest do doc = %{foo: "bar"} actions = [ - %Action.Index{doc: doc, _id: 1}, - %Action.Update{doc: doc, _id: 2}, - %Action.Update{doc: doc, _id: 3}, - %Action.Update{doc: doc, _id: 4} + %Action.Index{doc: doc, id: 1}, + %Action.Update{doc: doc, id: 2}, + %Action.Update{doc: doc, id: 3}, + %Action.Update{doc: doc, id: 4} ] {:error, %Snap.BulkError{errors: errors}} = @@ -58,10 +58,10 @@ defmodule Snap.BulkTest do doc = %{foo: "bar"} actions = [ - %Action.Update{_id: 1, doc: doc}, - %Action.Update{_id: 2, doc: doc}, - %Action.Update{_id: 3, doc: doc}, - %Action.Update{_id: 4, doc: doc} + %Action.Update{id: 1, doc: doc}, + %Action.Update{id: 2, doc: doc}, + %Action.Update{id: 3, doc: doc}, + %Action.Update{id: 4, doc: doc} ] {:error, %Snap.BulkError{errors: errors}} = @@ -77,12 +77,12 @@ defmodule Snap.BulkTest do doc = %{foo: "bar"} actions = [ - %Action.Update{_id: 1, doc: doc}, - %Action.Update{_id: 2, doc: doc}, - %Action.Update{_id: 3, doc: doc}, - %Action.Update{_id: 4, doc: doc}, - %Action.Update{_id: 5, doc: doc}, - %Action.Update{_id: 6, doc: doc} + %Action.Update{id: 1, doc: doc}, + %Action.Update{id: 2, doc: doc}, + %Action.Update{id: 3, doc: doc}, + %Action.Update{id: 4, doc: doc}, + %Action.Update{id: 5, doc: doc}, + %Action.Update{id: 6, doc: doc} ] {:error, %Snap.BulkError{errors: errors}} = @@ -98,12 +98,12 @@ defmodule Snap.BulkTest do doc = %{foo: "bar"} actions = [ - %Action.Index{_id: 1, doc: doc}, - %Action.Index{_id: 2, doc: doc}, - %Action.Index{_id: 3, doc: doc}, - %Action.Index{_id: 4, doc: doc}, - %Action.Index{_id: 5, doc: doc}, - %Action.Index{_id: 6, doc: doc} + %Action.Index{id: 1, doc: doc}, + %Action.Index{id: 2, doc: doc}, + %Action.Index{id: 3, doc: doc}, + %Action.Index{id: 4, doc: doc}, + %Action.Index{id: 5, doc: doc}, + %Action.Index{id: 6, doc: doc} ] assert :ok == diff --git a/test/indexes_test.exs b/test/indexes_test.exs index 44bc5ef..c51313e 100644 --- a/test/indexes_test.exs +++ b/test/indexes_test.exs @@ -43,7 +43,7 @@ defmodule Snap.IndexesTest do |> Stream.map(fn i -> doc = %{"title" => "Document #{i}"} - %Create{_id: i, doc: doc} + %Create{id: i, doc: doc} end) |> Indexes.hotswap(Cluster, @test_index, %{}, page_wait: 0, page_size: 1_000) @@ -67,7 +67,7 @@ defmodule Snap.IndexesTest do |> Stream.map(fn i -> doc = %{"title" => "Document #{i}"} - %Create{_id: i, doc: doc} + %Create{id: i, doc: doc} end) |> Indexes.hotswap(Cluster, @test_index, %{}, page_wait: 0, page_size: 1_000) diff --git a/test/multi/multi_test.exs b/test/multi/multi_test.exs index 944da07..1ebd35b 100644 --- a/test/multi/multi_test.exs +++ b/test/multi/multi_test.exs @@ -15,7 +15,7 @@ defmodule Snap.MultiTest do |> Enum.map(fn i -> doc = %{"title" => "Document #{i}"} - %Action.Index{_id: i, doc: doc} + %Action.Index{id: i, doc: doc} end) |> Snap.Bulk.perform(Cluster, @test_index, refresh: true) end diff --git a/test/search_test.exs b/test/search_test.exs index 0d16d94..5b054d5 100644 --- a/test/search_test.exs +++ b/test/search_test.exs @@ -15,7 +15,7 @@ defmodule Snap.SearchTest do |> Enum.map(fn i -> doc = %{"title" => "Document #{i}"} - %Action.Index{_id: i, doc: doc} + %Action.Index{id: i, doc: doc} end) |> Snap.Bulk.perform(Cluster, @test_index, refresh: true) @@ -36,7 +36,7 @@ defmodule Snap.SearchTest do |> Enum.map(fn i -> doc = %{"title" => "Document #{i}"} - %Action.Index{_id: i, doc: doc} + %Action.Index{id: i, doc: doc} end) |> Snap.Bulk.perform(Cluster, @test_index, refresh: true)