diff --git a/lib/explorer/backend/data_frame.ex b/lib/explorer/backend/data_frame.ex index b92f3dbbf..8a0839747 100644 --- a/lib/explorer/backend/data_frame.ex +++ b/lib/explorer/backend/data_frame.ex @@ -79,6 +79,9 @@ defmodule Explorer.Backend.DataFrame do eol_delimiter :: option(String.t()) ) :: io_result(df) + # IO: deltalake + @callback to_delta(df, table_uri :: fs_entry()) :: ok_result() + # IO: Parquet @callback from_parquet( entry :: fs_entry(), diff --git a/lib/explorer/data_frame.ex b/lib/explorer/data_frame.ex index 847bb9beb..355fbfd75 100644 --- a/lib/explorer/data_frame.ex +++ b/lib/explorer/data_frame.ex @@ -1392,6 +1392,33 @@ defmodule Explorer.DataFrame do end end + @doc """ + Writes a dataframe to a deltalake format file. + """ + @doc tyoe: :io + @spec to_delta(df :: DataFrame.t(), table_uri :: String.t() | fs_entry()) :: :ok | {:error, Exception.t()} + def to_delta(df, table_uri) do + Shared.apply_impl(df, :to_delta, [table_uri]) + end + + @doc """ + Similar to `to_delta/2`, but raises in case of error. + """ + @doc type: :io + @spec to_delta!(df :: DataFrame.t(), table_uri :: String.t() | fs_entry()) :: :ok + def to_delta!(df, filename) do + case to_delta(df, filename) do + :ok -> + :ok + + {:error, %module{} = e} when module in [ArgumentError, RuntimeError] -> + raise module, "to_delta failed: #{inspect(e.message)}" + + {:error, error} -> + raise "to_delta failed: #{inspect(error)}" + end + end + @doc """ Read a file of JSON objects or lists separated by new lines diff --git a/lib/explorer/polars_backend/data_frame.ex b/lib/explorer/polars_backend/data_frame.ex index b2fd16623..7d9f6baf9 100644 --- a/lib/explorer/polars_backend/data_frame.ex +++ b/lib/explorer/polars_backend/data_frame.ex @@ -222,6 +222,22 @@ defmodule Explorer.PolarsBackend.DataFrame do defp char_byte(nil), do: nil defp char_byte(<>), do: char + @impl true + def to_delta(%DataFrame{data: df}, %Local.Entry{} = entry) do + case Native.df_to_delta(df, entry.path) do + {:ok, _} -> :ok + {:error, error} -> {:error, RuntimeError.exception(error)} + end + end + + @impl true + def to_delta(%DataFrame{data: df}, entry) do + case Native.df_to_delta(df, entry) do + {:ok, _} -> :ok + {:error, error} -> {:error, RuntimeError.exception(error)} + end + end + @impl true def from_ndjson(%module{} = entry, infer_schema_length, batch_size) diff --git a/lib/explorer/polars_backend/native.ex b/lib/explorer/polars_backend/native.ex index 601719257..29438bdee 100644 --- a/lib/explorer/polars_backend/native.ex +++ b/lib/explorer/polars_backend/native.ex @@ -168,6 +168,7 @@ defmodule Explorer.PolarsBackend.Native do def df_transpose(_df, _keep_names_as, _new_col_names), do: err() def df_to_csv(_df, _filename, _has_headers, _delimiter), do: err() def df_to_csv_cloud(_df, _ex_entry, _has_headers, _delimiter), do: err() + def df_to_delta(_df, _table_uri), do: err() def df_to_dummies(_df, _columns), do: err() def df_to_ipc(_df, _filename, _compression), do: err() def df_to_ipc_cloud(_df, _ex_entry, _compression), do: err() diff --git a/native/explorer/Cargo.lock b/native/explorer/Cargo.lock index 5b7efe647..aab0bb1cb 100644 --- a/native/explorer/Cargo.lock +++ b/native/explorer/Cargo.lock @@ -24,6 +24,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" dependencies = [ "cfg-if", + "const-random", "getrandom", "once_cell", "version_check", @@ -96,6 +97,118 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf7d0a018de4f6aa429b9d33d69edf69072b1c5b1cb8d3e4a5f7ef898fc3eb76" +[[package]] +name = "arrow" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fab9e93ba8ce88a37d5a30dce4b9913b75413dc1ac56cb5d72e5a840543f829" +dependencies = [ + "ahash", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-csv", + "arrow-data", + "arrow-ipc", + "arrow-json", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", +] + +[[package]] +name = "arrow-arith" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc1d4e368e87ad9ee64f28b9577a3834ce10fe2703a26b28417d485bbbdff956" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "num", +] + +[[package]] +name = "arrow-array" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d02efa7253ede102d45a4e802a129e83bcc3f49884cab795b1ac223918e4318d" +dependencies = [ + "ahash", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "hashbrown 0.14.3", + "num", +] + +[[package]] +name = "arrow-buffer" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fda119225204141138cb0541c692fbfef0e875ba01bfdeaed09e9d354f9d6195" +dependencies = [ + "bytes", + "half", + "num", +] + +[[package]] +name = "arrow-cast" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d825d51b9968868d50bc5af92388754056796dbc62a4e25307d588a1fc84dee" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "chrono", + "half", + "lexical-core", + "num", +] + +[[package]] +name = "arrow-csv" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43ef855dc6b126dc197f43e061d4de46b9d4c033aa51c2587657f7508242cef1" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "csv", + "csv-core", + "lazy_static", + "lexical-core", + "regex", +] + +[[package]] +name = "arrow-data" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "475a4c3699c8b4095ca61cecf15da6f67841847a5f5aac983ccb9a377d02f73a" +dependencies = [ + "arrow-buffer", + "arrow-schema", + "half", + "num", +] + [[package]] name = "arrow-format" version = "0.8.1" @@ -106,6 +219,109 @@ dependencies = [ "serde", ] +[[package]] +name = "arrow-ipc" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1248005c8ac549f869b7a840859d942bf62471479c1a2d82659d453eebcd166a" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "flatbuffers", +] + +[[package]] +name = "arrow-json" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f03d7e3b04dd688ccec354fe449aed56b831679f03e44ee2c1cfc4045067b69c" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-schema", + "chrono", + "half", + "indexmap", + "lexical-core", + "num", + "serde", + "serde_json", +] + +[[package]] +name = "arrow-ord" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03b87aa408ea6a6300e49eb2eba0c032c88ed9dc19e0a9948489c55efdca71f4" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "half", + "num", +] + +[[package]] +name = "arrow-row" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "114a348ab581e7c9b6908fcab23cb39ff9f060eb19e72b13f8fb8eaa37f65d22" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "half", + "hashbrown 0.14.3", +] + +[[package]] +name = "arrow-schema" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d1d179c117b158853e0101bfbed5615e86fe97ee356b4af901f1c5001e1ce4b" +dependencies = [ + "serde", +] + +[[package]] +name = "arrow-select" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5c71e003202e67e9db139e5278c79f5520bb79922261dfe140e4637ee8b6108" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "num", +] + +[[package]] +name = "arrow-string" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4cebbb282d6b9244895f4a9a912e55e57bce112554c7fa91fcec5459cb421ab" +dependencies = [ + "arrow-array", + "arrow-buffer", + "arrow-data", + "arrow-schema", + "arrow-select", + "num", + "regex", + "regex-syntax 0.7.5", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -240,6 +456,12 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + [[package]] name = "bytes" version = "1.5.0" @@ -286,6 +508,26 @@ dependencies = [ "cc", ] +[[package]] +name = "const-random" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aaf16c9c2c612020bcfd042e170f6e32de9b9d75adb5277cdbbd2e2c8c8299a" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom", + "once_cell", + "tiny-keccak", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -354,6 +596,75 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + +[[package]] +name = "deltalake" +version = "0.16.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb694b21358cfa35ec1ccf9443269d6e21a9afba5e942dceb50019748271e811" +dependencies = [ + "arrow", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "async-trait", + "bytes", + "cfg-if", + "chrono", + "errno", + "futures", + "itertools", + "lazy_static", + "libc", + "log", + "num-bigint", + "num-traits", + "num_cpus", + "object_store 0.7.1", + "once_cell", + "parking_lot", + "parquet", + "percent-encoding", + "rand", + "regex", + "serde", + "serde_json", + "thiserror", + "tokio", + "url", + "uuid", +] + [[package]] name = "doc-comment" version = "0.3.3" @@ -399,6 +710,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "ethnum" version = "1.5.0" @@ -411,14 +732,16 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", + "deltalake", "either", "mimalloc", - "object_store", + "object_store 0.8.0", "polars", "polars-ops", "rand", "rand_pcg", "rustler", + "serde_json", "smartstring", "thiserror", "tokio", @@ -437,6 +760,16 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95765f67b4b18863968b4a1bd5bb576f732b29a4a28c7cd84c09fa3e2875f33c" +[[package]] +name = "flatbuffers" +version = "23.5.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4dac53e22462d78c16d64a1cd22371b54cc3fe94aa15e7886a2fa6e5d1ab8640" +dependencies = [ + "bitflags 1.3.2", + "rustc_version", +] + [[package]] name = "flate2" version = "1.0.28" @@ -611,6 +944,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc52e53916c08643f1b56ec082790d1e86a32e58dc5268f897f313fbae7b4872" +dependencies = [ + "cfg-if", + "crunchy", + "num-traits", +] + [[package]] name = "halfbrown" version = "0.2.4" @@ -783,6 +1127,12 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "ipnet" version = "2.9.0" @@ -1056,6 +1406,73 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05180d69e3da0e530ba2a1dae5110317e49e3b7f3d41be227dc5f92e49ee7af" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + +[[package]] +name = "num-bigint" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "608e7659b5c3d7cba262d894801b9ec9d00de989e8a82bd4bef91d08da45cdc0" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-complex" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "225d3389fb3509a24c93f5c29eb6bde2586b98d9f016636dff58d7c6f7569cd9" +dependencies = [ + "autocfg", + "num-traits", +] + +[[package]] +name = "num-iter" +version = "0.1.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d03e6c028c5dc5cac6e2dec0efda81fc887605bb3d884578bb6d6bf7514e252" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" +dependencies = [ + "autocfg", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.17" @@ -1085,6 +1502,27 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f930c88a43b1c3f6e776dfe495b4afab89882dbc81530c632db2ed65451ebcb4" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "humantime", + "itertools", + "parking_lot", + "percent-encoding", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "object_store" version = "0.8.0" @@ -1120,6 +1558,15 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "ordered-float" +version = "2.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c" +dependencies = [ + "num-traits", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1143,6 +1590,40 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "parquet" +version = "47.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0463cc3b256d5f50408c49a4be3a16674f4c8ceef60941709620a062b1f6bf4d" +dependencies = [ + "ahash", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-schema", + "arrow-select", + "base64", + "brotli", + "bytes", + "chrono", + "flate2", + "futures", + "hashbrown 0.14.3", + "lz4", + "num", + "num-bigint", + "object_store 0.7.1", + "paste", + "seq-macro", + "snap", + "thrift", + "tokio", + "twox-hash", + "zstd 0.12.4", +] + [[package]] name = "parquet-format-safe" version = "0.2.4" @@ -1153,6 +1634,12 @@ dependencies = [ "futures", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1233,7 +1720,7 @@ dependencies = [ "streaming-iterator", "strength_reduce", "version_check", - "zstd", + "zstd 0.13.0", ] [[package]] @@ -1287,7 +1774,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6396de788f99ebfc9968e7b6f523e23000506cde4ba6dfc62ae4ce949002a886" dependencies = [ "arrow-format", - "object_store", + "object_store 0.8.0", "regex", "simdutf8", "thiserror", @@ -1312,7 +1799,7 @@ dependencies = [ "memchr", "memmap2", "num-traits", - "object_store", + "object_store 0.8.0", "once_cell", "percent-encoding", "polars-arrow", @@ -1333,7 +1820,7 @@ dependencies = [ "tokio", "tokio-util", "url", - "zstd", + "zstd 0.13.0", ] [[package]] @@ -1433,7 +1920,7 @@ dependencies = [ "simdutf8", "snap", "streaming-decompression", - "zstd", + "zstd 0.13.0", ] [[package]] @@ -1694,7 +2181,7 @@ dependencies = [ "aho-corasick", "memchr", "regex-automata", - "regex-syntax", + "regex-syntax 0.8.2", ] [[package]] @@ -1705,9 +2192,15 @@ checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" dependencies = [ "aho-corasick", "memchr", - "regex-syntax", + "regex-syntax 0.8.2", ] +[[package]] +name = "regex-syntax" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" + [[package]] name = "regex-syntax" version = "0.8.2" @@ -1776,6 +2269,15 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +dependencies = [ + "semver", +] + [[package]] name = "rustler" version = "0.29.1" @@ -1877,6 +2379,12 @@ dependencies = [ "untrusted", ] +[[package]] +name = "semver" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97ed7a9823b74f99c7742f5336af7be5ecd3eeafcb1507d1fa93347b1d589b0" + [[package]] name = "seq-macro" version = "0.3.5" @@ -1905,9 +2413,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.111" +version = "1.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" +checksum = "69801b70b1c3dac963ecb03a364ba0ceda9cf60c71cfe475e99864759c8b8a79" dependencies = [ "itoa", "ryu", @@ -2152,6 +2660,26 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "ordered-float", +] + +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -2178,6 +2706,7 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", "socket2", "tokio-macros", @@ -2262,6 +2791,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "twox-hash" +version = "1.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" +dependencies = [ + "cfg-if", + "static_assertions", +] + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -2309,6 +2848,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "uuid" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a" +dependencies = [ + "getrandom", + "serde", +] + [[package]] name = "value-trait" version = "0.8.0" @@ -2671,13 +3220,32 @@ dependencies = [ "syn 2.0.48", ] +[[package]] +name = "zstd" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a27595e173641171fc74a1232b7b1c7a7cb6e18222c11e9dfb9888fa424c53c" +dependencies = [ + "zstd-safe 6.0.6", +] + [[package]] name = "zstd" version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" dependencies = [ - "zstd-safe", + "zstd-safe 7.0.0", +] + +[[package]] +name = "zstd-safe" +version = "6.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee98ffd0b48ee95e6c5168188e44a54550b1564d9d530ee21d5f0eaed1069581" +dependencies = [ + "libc", + "zstd-sys", ] [[package]] diff --git a/native/explorer/Cargo.toml b/native/explorer/Cargo.toml index 96fbc892c..1b95d0eff 100644 --- a/native/explorer/Cargo.toml +++ b/native/explorer/Cargo.toml @@ -31,6 +31,13 @@ tokio-util = { version = "0.7", default-features = false, features = [ ], optional = true } object_store = { version = "0.8", default-features = false, optional = true } +# deltalake +deltalake = { version = "0.16.5", optional = true, default-features = true, features = [ + "arrow", + "parquet", +] } +serde_json = "1.0.113" + # MiMalloc won´t compile on Windows with the GCC compiler. # On Linux with Musl it won´t load correctly. [target.'cfg(not(any(all(windows, target_env = "gnu"), all(target_os = "linux", target_env = "musl"))))'.dependencies] @@ -86,7 +93,7 @@ version = "0.36" features = ["abs", "ewma", "cum_agg", "cov"] [features] -default = ["ndjson", "cloud", "nif_version_2_15"] +default = ["ndjson", "deltalake", "cloud", "nif_version_2_15"] cloud = ["object_store", "tokio", "tokio-util", "aws", "polars/cloud", "polars/cloud_write"] ndjson = ["polars/json"] diff --git a/native/explorer/src/dataframe/io.rs b/native/explorer/src/dataframe/io.rs index 55b7f6449..bb3a42d77 100644 --- a/native/explorer/src/dataframe/io.rs +++ b/native/explorer/src/dataframe/io.rs @@ -16,6 +16,7 @@ use std::fs::File; use std::io::{BufReader, BufWriter, Cursor}; use std::result::Result; use std::sync::Arc; +use std::collections::BTreeMap; use crate::datatypes::{ExParquetCompression, ExS3Entry, ExSeriesDtype}; use crate::{ExDataFrame, ExplorerError}; @@ -175,6 +176,206 @@ pub fn df_load_csv( Ok(ExDataFrame::new(reader.finish()?)) } +// =========== deltalake =========== // + +use deltalake::arrow::record_batch::RecordBatch; +use deltalake::writer::{DeltaWriter, RecordBatchWriter}; +use deltalake::errors::DeltaTableError; +use deltalake::DeltaTable; +use deltalake::parquet::{ + basic::{Compression, ZstdLevel}, + file::properties::WriterProperties, +}; +use tokio::runtime; + +#[rustler::nif(schedule = "DirtyIo")] +pub fn df_to_delta( + data: ExDataFrame, + table_uri: &str +) -> Result { + Ok(runtime::Builder::new_current_thread().build().unwrap().block_on(do_df_to_delta(&data, table_uri))?) +} + +fn to_delta_datatype(from: &crate::dataframe::arrow::datatypes::ArrowDataType) -> Result { + match from { + ArrowDataType::Utf8 => Ok(deltalake::schema::SchemaDataType::primitive("string".to_string())), + ArrowDataType::LargeUtf8 => Ok(deltalake::schema::SchemaDataType::primitive("string".to_string())), + ArrowDataType::Int64 => Ok(deltalake::schema::SchemaDataType::primitive("long".to_string())), // undocumented type + ArrowDataType::Int32 => Ok(deltalake::schema::SchemaDataType::primitive("integer".to_string())), + ArrowDataType::Int16 => Ok(deltalake::schema::SchemaDataType::primitive("short".to_string())), + ArrowDataType::Int8 => Ok(deltalake::schema::SchemaDataType::primitive("byte".to_string())), + ArrowDataType::UInt64 => Ok(deltalake::schema::SchemaDataType::primitive("long".to_string())), // undocumented type + ArrowDataType::UInt32 => Ok(deltalake::schema::SchemaDataType::primitive("integer".to_string())), + ArrowDataType::UInt16 => Ok(deltalake::schema::SchemaDataType::primitive("short".to_string())), + ArrowDataType::UInt8 => Ok(deltalake::schema::SchemaDataType::primitive("byte".to_string())), + ArrowDataType::Float32 => Ok(deltalake::schema::SchemaDataType::primitive("float".to_string())), + ArrowDataType::Float64 => Ok(deltalake::schema::SchemaDataType::primitive("double".to_string())), + ArrowDataType::Boolean => Ok(deltalake::schema::SchemaDataType::primitive("boolean".to_string())), + ArrowDataType::Binary => Ok(deltalake::schema::SchemaDataType::primitive("binary".to_string())), + ArrowDataType::FixedSizeBinary(_) => { + Ok(deltalake::schema::SchemaDataType::primitive("binary".to_string())) + } + ArrowDataType::LargeBinary => { + Ok(deltalake::schema::SchemaDataType::primitive("binary".to_string())) + } + ArrowDataType::Decimal(p, s) => Ok(deltalake::schema::SchemaDataType::primitive(format!( + "decimal({p},{s})" + ))), + ArrowDataType::Decimal256(p, s) => Ok(deltalake::schema::SchemaDataType::primitive(format!( + "decimal({p},{s})" + ))), + ArrowDataType::Date32 => Ok(deltalake::schema::SchemaDataType::primitive("date".to_string())), + ArrowDataType::Date64 => Ok(deltalake::schema::SchemaDataType::primitive("date".to_string())), + ArrowDataType::Timestamp(crate::dataframe::arrow::datatypes::TimeUnit::Microsecond, None) => { + Ok(deltalake::schema::SchemaDataType::primitive("timestamp".to_string())) + } + ArrowDataType::Timestamp(crate::dataframe::arrow::datatypes::TimeUnit::Microsecond, Some(tz)) + if tz.eq_ignore_ascii_case("utc") => + { + Ok(deltalake::schema::SchemaDataType::primitive("timestamp".to_string())) + } + ArrowDataType::Struct(fields) => { + let converted_fields: Result, _> = fields + .iter() + .map(|field| to_delta_schema_field(&field)) + .collect(); + Ok(deltalake::schema::SchemaDataType::r#struct( + deltalake::schema::SchemaTypeStruct::new(converted_fields?), + )) + } + ArrowDataType::List(field) => { + Ok(deltalake::schema::SchemaDataType::array(deltalake::schema::SchemaTypeArray::new( + Box::new(to_delta_datatype((*field).data_type())?), + (*field).is_nullable, + ))) + } + ArrowDataType::LargeList(field) => { + Ok(deltalake::schema::SchemaDataType::array(deltalake::schema::SchemaTypeArray::new( + Box::new(to_delta_datatype((*field).data_type())?), + (*field).is_nullable, + ))) + } + ArrowDataType::FixedSizeList(field, _) => { + Ok(deltalake::schema::SchemaDataType::array(deltalake::schema::SchemaTypeArray::new( + Box::new(to_delta_datatype((*field).data_type())?), + (*field).is_nullable, + ))) + } + ArrowDataType::Map(field, _) => { + if let ArrowDataType::Struct(struct_fields) = field.data_type() { + let key_type = to_delta_datatype(struct_fields[0].data_type())?; + let value_type = to_delta_datatype(struct_fields[1].data_type())?; + let value_type_nullable = struct_fields[1].is_nullable; + Ok(deltalake::schema::SchemaDataType::map(deltalake::schema::SchemaTypeMap::new( + Box::new(key_type), + Box::new(value_type), + value_type_nullable, + ))) + } else { + panic!("DataType::Map should contain a struct field child"); + } + } + s => Err(ExplorerError::Other(format!( + "Invalid data type for Delta Lake: {:?}", s + ))), + } +} + +fn to_delta_schema_field(from: &crate::dataframe::arrow::datatypes::Field) -> Result { + let metadata: std::collections::HashMap<_, serde_json::value::Value> = as Clone>::clone(&from.metadata).into_iter().map(|(key, value)| (key, serde_json::value::Value::String(value))).collect(); + let to_datatype = to_delta_datatype(&from.data_type)?; + Ok(deltalake::schema::SchemaField::new(from.name.clone(), to_datatype, from.is_nullable, metadata)) +} + +fn to_delta_schema(from: &crate::dataframe::arrow::datatypes::ArrowSchema) -> Result, ExplorerError> { + let mut to_fields: Vec = Vec::with_capacity(from.fields.capacity()); + for field in from.fields.iter() { + let new_field = to_delta_schema_field(field)?; + to_fields.push(new_field); + } + + Ok(to_fields) +} + +async fn do_df_to_delta( + data: &ExDataFrame, + table_uri: &str +) -> Result { + match deltalake::Path::parse(&table_uri) { + Ok(table_path) => { + let maybe_table = deltalake::open_table(&table_path).await; + let polars_schema = data.schema(); + let arrow_schema : crate::dataframe::arrow::datatypes::ArrowSchema = polars_schema.to_arrow(); + let deltalake_schema = to_delta_schema(&arrow_schema)?; + let mut table = match maybe_table { + Ok(table) => table, + Err(DeltaTableError::NotATable(_)) => { + deltalake::DeltaOps::try_from_uri(table_path) + .await + .unwrap() + .create() + .with_columns(deltalake_schema) + .await? + } + Err(err) => return Err(err)?, + }; + + let writer_properties = WriterProperties::builder() + .set_compression(Compression::ZSTD(ZstdLevel::try_new(3).unwrap())) + .build(); + + let mut writer = RecordBatchWriter::for_table(&table)?; + writer = writer.with_writer_properties(writer_properties); + + let mut chunk_idx = 0usize; + while let Some(batch) = convert_to_batch(&table, data, chunk_idx)? { + writer.write(batch).await?; + chunk_idx += 1; + } + + let adds = writer + .flush_and_commit(&mut table) + .await?; + Ok(adds) + }, + Err(e) => { + return Err(ExplorerError::Other(format!("Invalid table path: {:?}", e))); + } + } +} + +fn convert_to_batch(table: &DeltaTable, data: &ExDataFrame, chunk_idx: usize) -> Result, ExplorerError> { + let metadata = table.get_metadata()?; + if let Ok(arrow_schema) = >::try_from( + &metadata.schema.clone(), + ) { + let arrow_schema_ref = Arc::new(arrow_schema); + + let arrow_array: Vec = data.get_columns().iter().map(|series| { + // todo: deltalake: cast array type without copying data, e.g,: + // from: &crate::dataframe::arrow::array::Int32Array, + // to: &deltalake::arrow::array::Int32Array + + // attemp 1: + // failed: downcast_ref() returns None + // let arrow_series = series.to_arrow(chunk_idx); + // let arrow_series = arrow_series.as_any(); + // let arrow_series = arrow_series.downcast_ref::().unwrap(); + + // attemp 2: + // failed: segmentation fault + let arrow_series = series.to_arrow(chunk_idx); + let arrow_series = arrow_series.as_ref(); + let s: Arc = unsafe { std::mem::transmute(arrow_series) }; + s + }).collect(); + + Ok(Some(RecordBatch::try_new(arrow_schema_ref, arrow_array).expect("Failed to create RecordBatch"))) + } else { + Err(ExplorerError::Other(format!("Failed to convert Delta Lake schema to Arrow schema"))) + } +} + // ============ Parquet ============ // #[rustler::nif(schedule = "DirtyIo")] diff --git a/native/explorer/src/error.rs b/native/explorer/src/error.rs index b37b84ce1..c3dea01a1 100644 --- a/native/explorer/src/error.rs +++ b/native/explorer/src/error.rs @@ -22,6 +22,8 @@ pub enum ExplorerError { Utf8(#[from] std::string::FromUtf8Error), #[error("Polars Error: {0}")] Polars(#[from] polars::prelude::PolarsError), + #[error("DeltaTable Error: {0}")] + DeltaTable(#[from] deltalake::DeltaTableError), #[error("Internal Error: {0}")] Internal(String), #[error("Generic Error: {0}")] diff --git a/native/explorer/src/lib.rs b/native/explorer/src/lib.rs index 3b37d383b..88ec7af87 100644 --- a/native/explorer/src/lib.rs +++ b/native/explorer/src/lib.rs @@ -128,6 +128,7 @@ rustler::init!( df_transpose, df_to_csv, df_to_csv_cloud, + df_to_delta, df_to_dummies, df_to_ipc, df_to_ipc_cloud,