From 764b5e0bf80a8d967f16e98615d8927f34eb9ee5 Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Wed, 15 Jan 2025 15:07:27 -0500 Subject: [PATCH] dekaf: Implement integration tests for round-tripping data through Dekaf Then implement some tests to validate field selection logic --- Cargo.lock | 493 +++++++++++- Cargo.toml | 7 + crates/dekaf/Cargo.toml | 2 +- crates/dekaf/tests/dekaf_integration_test.rs | 728 ++++++++++++++++++ ...__field_selection_flow_document-key-0.snap | 10 + ...field_selection_flow_document-value-0.snap | 20 + ...st__field_selection_recommended-key-0.snap | 10 + ...__field_selection_recommended-value-0.snap | 12 + ..._test__field_selection_specific-key-0.snap | 10 + ...est__field_selection_specific-value-0.snap | 7 + ...ation_test__fields_not_required-key-0.snap | 9 + ...ion_test__fields_not_required-value-0.snap | 11 + ...tegration_test__meta_is_deleted-key-0.snap | 10 + ...tegration_test__meta_is_deleted-key-1.snap | 10 + ...gration_test__meta_is_deleted-value-0.snap | 10 + ...gration_test__meta_is_deleted-value-1.snap | 10 + supabase/seed.sql | 11 + 17 files changed, 1350 insertions(+), 20 deletions(-) create mode 100644 crates/dekaf/tests/dekaf_integration_test.rs create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_flow_document-key-0.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_flow_document-value-0.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_recommended-key-0.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_recommended-value-0.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_specific-key-0.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_specific-value-0.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__fields_not_required-key-0.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__fields_not_required-value-0.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-key-0.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-key-1.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-value-0.snap create mode 100644 crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-value-1.snap diff --git a/Cargo.lock b/Cargo.lock index 57a93fbafc6..e23eb0a7dc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -33,6 +33,7 @@ version = "0.15.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a93b8a41dbe230ad5087cc721f8d41611de654542180586b315d9f4cf6b72bef" dependencies = [ + "psl", "psl-types", ] @@ -136,7 +137,7 @@ dependencies = [ "proto-grpc", "rand 0.8.5", "regex", - "reqwest", + "reqwest 0.11.27", "runtime", "rustls 0.23.10", "schemars", @@ -371,7 +372,31 @@ dependencies = [ "strum 0.25.0", "strum_macros 0.25.3", "thiserror", - "typed-builder", + "typed-builder 0.16.2", + "uuid 1.10.0", +] + +[[package]] +name = "apache-avro" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aef82843a0ec9f8b19567445ad2421ceeb1d711514384bdd3d49fe37102ee13" +dependencies = [ + "bigdecimal 0.4.7", + "digest", + "libflate", + "log", + "num-bigint", + "quad-rand", + "rand 0.8.5", + "regex-lite", + "serde", + "serde_bytes", + "serde_json", + "strum 0.26.3", + "strum_macros 0.26.4", + "thiserror", + "typed-builder 0.19.1", "uuid 1.10.0", ] @@ -610,7 +635,7 @@ dependencies = [ "hmac", "http-types", "hyper 0.14.30", - "hyper-tls", + "hyper-tls 0.5.0", "serde", "serde_json", "serde_path_to_error", @@ -696,7 +721,7 @@ dependencies = [ name = "avro" version = "0.0.0" dependencies = [ - "apache-avro", + "apache-avro 0.16.0", "doc", "hexdump", "insta", @@ -897,6 +922,20 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bigdecimal" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f31f3af01c5c65a07985c804d3366560e6fa7883d640a122819b14ec327482c" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", + "serde", +] + [[package]] name = "billing-integrations" version = "0.0.0" @@ -1066,7 +1105,7 @@ dependencies = [ "models", "ops", "proto-flow", - "reqwest", + "reqwest 0.11.27", "runtime", "rusqlite", "sources", @@ -1872,6 +1911,20 @@ dependencies = [ "parking_lot_core 0.9.10", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core 0.9.10", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -1939,7 +1992,8 @@ dependencies = [ "aes-siv", "allocator", "anyhow", - "apache-avro", + "apache-avro 0.16.0", + "async-process", "async-trait", "avro", "axum", @@ -1954,17 +2008,20 @@ dependencies = [ "doc", "extractors", "flow-client", + "flowctl", "futures", "gazette", "hex", "hexdump", "humantime", + "insta", "itertools 0.10.5", "json", "jsonwebtoken", "kafka-protocol", "labels", "lazy_static", + "locate-bin", "lz4_flex", "md5", "metrics", @@ -1977,16 +2034,20 @@ dependencies = [ "proto-flow", "proto-gazette", "rand 0.8.5", + "rdkafka", "regex", + "reqwest 0.11.27", "rsasl", "rustls 0.23.10", "rustls-native-certs 0.7.2", "rustls-pemfile 2.1.3", + "schema_registry_converter", "schemars", "serde", "serde_json", "simd-doc", "socket2", + "tempfile", "time 0.3.36", "tokio", "tokio-rustls 0.26.0", @@ -1996,6 +2057,7 @@ dependencies = [ "tracing", "tracing-record-hierarchical", "tracing-subscriber", + "tracing-test", "tuple", "typestate", "unseal", @@ -2154,10 +2216,10 @@ version = "0.0.0" dependencies = [ "allocator", "base64 0.13.1", - "bigdecimal", + "bigdecimal 0.3.1", "bumpalo", "bytes", - "fancy-regex", + "fancy-regex 0.10.0", "futures", "fxhash", "hexdump", @@ -2199,6 +2261,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "downcast-rs" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" + [[package]] name = "dunce" version = "1.0.4" @@ -2258,6 +2326,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "erased-serde" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c138974f9d5e7fe373eb04df7cae98833802ae4b11c24ac7039a21d5af4b26c" +dependencies = [ + "serde", +] + [[package]] name = "errno" version = "0.3.9" @@ -2317,6 +2394,16 @@ dependencies = [ "regex", ] +[[package]] +name = "fancy-regex" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b95f7c0680e4142284cf8b22c14a476e87d61b004a3a0861872b32ef7ead40a2" +dependencies = [ + "bit-set", + "regex", +] + [[package]] name = "fastrand" version = "1.9.0" @@ -2385,7 +2472,7 @@ dependencies = [ "postgrest", "proto-flow", "proto-gazette", - "reqwest", + "reqwest 0.11.27", "serde", "serde_json", "time 0.3.36", @@ -2468,7 +2555,7 @@ dependencies = [ "proto-gazette", "proto-grpc", "rand 0.8.5", - "reqwest", + "reqwest 0.11.27", "runtime", "rusqlite", "rustls 0.23.10", @@ -2689,7 +2776,7 @@ dependencies = [ "proto-gazette", "proto-grpc", "rand 0.8.5", - "reqwest", + "reqwest 0.11.27", "serde_json", "simd-doc", "thiserror", @@ -3160,6 +3247,22 @@ dependencies = [ "tokio-native-tls", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.7" @@ -3451,10 +3554,10 @@ name = "json" version = "0.0.0" dependencies = [ "addr", - "bigdecimal", + "bigdecimal 0.3.1", "bitvec 0.19.6", "criterion", - "fancy-regex", + "fancy-regex 0.10.0", "fxhash", "glob", "iri-string", @@ -3482,6 +3585,25 @@ dependencies = [ "treediff", ] +[[package]] +name = "json-pointer" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fe841b94e719a482213cee19dd04927cf412f26d8dc84c5a446c081e49c2997" +dependencies = [ + "serde_json", +] + +[[package]] +name = "jsonway" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "effcb749443c905fbaef49d214f8b1049c240e0adb7af9baa0e201e625e4f9de" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "jsonwebtoken" version = "9.3.0" @@ -3640,7 +3762,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if 1.0.0", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -3697,6 +3819,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdc53a7799a7496ebc9fd29f31f7df80e83c9bda5299768af5f9e59eeea74647" dependencies = [ "cc", + "libc", "pkg-config", "vcpkg", ] @@ -4090,6 +4213,7 @@ checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" dependencies = [ "num-integer", "num-traits", + "serde", ] [[package]] @@ -4158,6 +4282,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e613fc340b2220f734a8595782c551f1250e969d87d3be1ae0579e8d4065179" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "object" version = "0.36.3" @@ -4373,7 +4518,7 @@ dependencies = [ name = "parser" version = "0.0.0" dependencies = [ - "apache-avro", + "apache-avro 0.16.0", "assert_cmd", "base64 0.13.1", "bytecount", @@ -4569,6 +4714,44 @@ dependencies = [ "indexmap 2.3.0", ] +[[package]] +name = "phf" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aef8048c789fa5e851558d709946d6d79a8ff88c0440c587967f8e94bfb1216a" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" +dependencies = [ + "phf_shared", + "rand 0.8.5", +] + +[[package]] +name = "phf_shared" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -4656,7 +4839,7 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a966c650b47a064e7082170b4be74fca08c088d893244fc4b70123e3c1f3ee7" dependencies = [ - "reqwest", + "reqwest 0.11.27", ] [[package]] @@ -4721,6 +4904,15 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "proc-macro-crate" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecf48c7ca261d60b74ab1a7b20da18bede46776b2e55535cb958eb595c5fa7b" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -4792,7 +4984,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.13.0", "log", "multimap", @@ -4932,6 +5124,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "psl" +version = "2.1.78" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "923004c561dc4ed49d1e19cbf2af39780666b32d99941dafe8561f311daffd3d" +dependencies = [ + "psl-types", +] + [[package]] name = "psl-types" version = "2.0.11" @@ -5170,6 +5371,36 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rdkafka" +version = "0.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b52c81ac3cac39c9639b95c20452076e74b8d9a71bc6fc4d83407af2ea6fff" +dependencies = [ + "futures-channel", + "futures-util", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.8.0+2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ced38182dc436b3d9df0c77976f37a67134df26b050df1f0006688e46fc4c8be" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "rdrand" version = "0.4.0" @@ -5328,6 +5559,45 @@ dependencies = [ "winreg", ] +[[package]] +name = "reqwest" +version = "0.12.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" +dependencies = [ + "base64 0.22.1", + "bytes", + "futures-core", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-tls 0.6.0", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls-pemfile 2.1.3", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "windows-registry", +] + [[package]] name = "ring" version = "0.17.8" @@ -5662,6 +5932,24 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "schema_registry_converter" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcc3cf40651cf503827a34bcd7efbbd4750a7e3adc6768bb8089977e4d07303b" +dependencies = [ + "apache-avro 0.17.0", + "byteorder", + "dashmap 6.1.0", + "futures", + "reqwest 0.12.9", + "serde", + "serde_json", + "tokio", + "url", + "valico", +] + [[package]] name = "schemalate" version = "0.0.0" @@ -5820,6 +6108,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "serde_bytes" +version = "0.11.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "387cc504cb06bb40a96c8e04e951fe01854cf6bc921053c954e4a606d9675c6a" +dependencies = [ + "serde", +] + [[package]] name = "serde_cbor" version = "0.11.2" @@ -5952,7 +6249,7 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92761393ee4dc3ff8f4af487bd58f4307c9329bbedea02cac0089ad9c411e153" dependencies = [ - "dashmap", + "dashmap 5.5.3", "futures", "lazy_static", "log", @@ -6071,6 +6368,12 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1de1d4f81173b03af4c0cbed3c898f6bff5b870e4a7f5d6f4057d62a7a4b686e" +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "size" version = "0.4.1" @@ -6323,6 +6626,12 @@ version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" +[[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" + [[package]] name = "strum_macros" version = "0.24.3" @@ -6349,6 +6658,19 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.74", +] + [[package]] name = "subtle" version = "2.6.1" @@ -6394,6 +6716,9 @@ name = "sync_wrapper" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] [[package]] name = "system-configuration" @@ -6723,6 +7048,23 @@ dependencies = [ "serde", ] +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" + +[[package]] +name = "toml_edit" +version = "0.22.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae48d6208a266e853d946088ed816055e556cc6028c5e8e2b84d9fa5dd7c7f5" +dependencies = [ + "indexmap 2.3.0", + "toml_datetime", + "winnow", +] + [[package]] name = "tonic" version = "0.12.1" @@ -6920,6 +7262,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn 2.0.74", +] + [[package]] name = "treediff" version = "4.0.3" @@ -6978,7 +7341,16 @@ version = "0.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34085c17941e36627a879208083e25d357243812c30e7d7387c3b954f30ade16" dependencies = [ - "typed-builder-macro", + "typed-builder-macro 0.16.2", +] + +[[package]] +name = "typed-builder" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06fbd5b8de54c5f7c91f6fe4cebb949be2125d7758e630bb58b1d831dbce600" +dependencies = [ + "typed-builder-macro 0.19.1", ] [[package]] @@ -6992,6 +7364,17 @@ dependencies = [ "syn 2.0.74", ] +[[package]] +name = "typed-builder-macro" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9534daa9fd3ed0bd911d462a37f172228077e7abf18c18a5f67199d959205f8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "typenum" version = "1.17.0" @@ -7108,6 +7491,15 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "uritemplate-next" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcde98d1fc3f528255b1ecb22fb688ee0d23deb672a8c57127df10b98b4bd18c" +dependencies = [ + "regex", +] + [[package]] name = "url" version = "2.5.2" @@ -7151,6 +7543,30 @@ dependencies = [ "serde", ] +[[package]] +name = "valico" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca8a0a4df97f827fcbcbe69c65364acddddf3a4bb50e6507f63361177a7ea7a4" +dependencies = [ + "addr", + "base64 0.21.7", + "chrono", + "downcast-rs", + "erased-serde", + "fancy-regex 0.11.0", + "json-pointer", + "jsonway", + "percent-encoding", + "phf", + "phf_codegen", + "serde", + "serde_json", + "uritemplate-next", + "url", + "uuid 1.10.0", +] + [[package]] name = "validation" version = "0.0.0" @@ -7539,6 +7955,36 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.42.0" @@ -7744,6 +8190,15 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.6.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8d71a593cc5c42ad7876e2c1fda56f314f3754c084128833e64f1345ff8a03a" +dependencies = [ + "memchr", +] + [[package]] name = "winreg" version = "0.50.0" diff --git a/Cargo.toml b/Cargo.toml index 3190a990039..d9e7fbd7fc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -228,7 +228,14 @@ insta = { version = "1.20", features = ["redactions", "json", "yaml"] } pretty_assertions = "1.4.0" rand = { version = "0.8", features = ["small_rng"] } rand_distr = { version = "0.4" } +rdkafka = "0.37" +schema_registry_converter = { version = "4.2.0", features = [ + "easy", + "avro", + "json", +] } serial_test = "0.9" +tracing-test = "0.2.5" wasm-bindgen-test = "0.3.13" # Used exclusively as build-dependencies diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index a44103cb16e..324bc9656a9 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -14,7 +14,6 @@ allocator = { path = "../allocator" } avro = { path = "../avro" } doc = { path = "../doc" } extractors = { path = "../extractors" } -tuple = { path = "../tuple" } flow-client = { path = "../flow-client" } gazette = { path = "../gazette" } json = { path = "../json" } @@ -24,6 +23,7 @@ ops = { path = "../ops" } proto-flow = { path = "../proto-flow" } proto-gazette = { path = "../proto-gazette" } simd-doc = { path = "../simd-doc" } +tuple = { path = "../tuple" } unseal = { path = "../unseal" } aes-siv = { workspace = true } diff --git a/crates/dekaf/tests/dekaf_integration_test.rs b/crates/dekaf/tests/dekaf_integration_test.rs new file mode 100644 index 00000000000..9c198229b34 --- /dev/null +++ b/crates/dekaf/tests/dekaf_integration_test.rs @@ -0,0 +1,728 @@ +use anyhow::Context; +use dekaf::connector::DekafConfig; +use futures::StreamExt; +use locate_bin; +use rand::Rng; +use rdkafka::{consumer::Consumer, Message}; +use schema_registry_converter::async_impl::avro; +use schema_registry_converter::async_impl::schema_registry; +use serde_json::json; +use std::{collections::HashMap, env, io::Write, time::Duration}; + +async fn create_consumer<'a>( + username: &'a str, + token: &'a str, + topic: &'a str, +) -> (rdkafka::consumer::StreamConsumer, avro::AvroDecoder<'a>) { + #[allow(non_snake_case)] + let DEKAF_BROKER = env::var("DEKAF_BROKER").expect("Missing DEKAF_BROKER environment variable"); + #[allow(non_snake_case)] + let SCHEMA_REGISTRY = + env::var("DEKAF_REGISTRY").expect("Missing DEKAF_REGISTRY environment variable"); + + let consumer: rdkafka::consumer::StreamConsumer = rdkafka::ClientConfig::new() + .set("bootstrap.servers", DEKAF_BROKER) + .set("security.protocol", "SASL_PLAINTEXT") + .set("sasl.mechanism", "PLAIN") + .set("sasl.username", username) + .set("sasl.password", token) + .set("group.id", "this_needs_to_be_set_but_we_dont_use_it") + .set("enable.auto.commit", "false") + .set("auto.offset.reset", "smallest") + .set("enable.auto.offset.store", "false") + .create() + .expect("Consumer creation failed"); + + consumer + .subscribe(vec![topic].as_slice()) + .expect("Consumer subscription failed"); + + let decoder = avro::AvroDecoder::new( + schema_registry::SrSettings::new_builder(String::from(SCHEMA_REGISTRY)) + .set_basic_authorization(username, Some(token)) + .build() + .expect("failed to build avro decoder"), + ); + + (consumer, decoder) +} + +#[derive(Debug)] +enum SpecAction { + Create, + Delete, +} + +async fn test_specs( + name: &str, + action: SpecAction, + mut capture: models::CaptureDef, + collections: HashMap<&str, models::CollectionDef>, + mut materialization: models::MaterializationDef, +) -> anyhow::Result<(String, String)> { + let mut temp_flow = tempfile::NamedTempFile::new()?; + + let suffix: String = rand::thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .take(4) + .map(char::from) + .collect(); + + let capture_name = format!("{}/{suffix}/source-http-ingest", name); + let materialization_name = format!("{}/{suffix}/test-dekaf", name); + + tracing::info!(temp=?temp_flow, "Attempting to {:?}", action); + + // rewrite capture bindings + capture.bindings.iter_mut().for_each(|binding| { + binding.target = models::Collection::new(format!("{name}/{}", binding.target.to_string())) + }); + + // rewrite materialization sources + materialization.bindings.iter_mut().for_each(|binding| { + binding + .source + .set_collection(models::Collection::new(format!( + "{name}/{}", + binding.source.collection() + ))) + }); + + let collections_mapped = collections + .into_iter() + .fold(HashMap::new(), |mut state, (k, v)| { + state.insert(format!("{name}/{k}"), v); + state + }); + + let file_contents = json!({ + "captures": { + &capture_name: capture + }, + "collections": collections_mapped, + "materializations": { + &materialization_name: materialization + } + }); + + temp_flow.write(serde_json::to_vec(&file_contents)?.as_slice())?; + + let flowctl = locate_bin::locate("flowctl").context("failed to locate flowctl")?; + + let async_process::Output { + stderr, + stdout: _stdout, + status, + } = async_process::output( + async_process::Command::new(flowctl).args( + match action { + SpecAction::Create => vec![ + "catalog", + "publish", + "--auto-approve", + "--default-data-plane", + "ops/dp/public/local-cluster", + "--source", + temp_flow.path().to_str().unwrap(), + ], + SpecAction::Delete => vec![ + "catalog", + "delete", + "--prefix", + name, + "--captures=true", + "--collections=true", + "--materializations=true", + "--dangerous-auto-approve", + ], + } + .as_slice(), + ), + ) + .await + .context("failed to invoke flowctl")?; + + if !status.success() { + let output = String::from_utf8_lossy(&stderr); + if !output.contains("no specs found matching given selector") { + anyhow::bail!("flowctl failed: {}", output); + } + } + + tracing::info!( + ?capture_name, + ?materialization_name, + "Successful {:?}", + action + ); + + Ok((capture_name, materialization_name)) +} + +async fn sops_encrypt(input: models::RawValue) -> anyhow::Result { + #[allow(non_snake_case)] + let KEYRING = env::var("SOPS_KEYRING").unwrap_or( + "projects/estuary-control/locations/us-central1/keyRings/sops/cryptoKeys/cd-github-control" + .to_string(), + ); + + let sops = locate_bin::locate("sops").context("failed to locate sops")?; + + let async_process::Output { + stderr, + stdout, + status, + } = async_process::input_output( + async_process::Command::new(sops).args([ + "--encrypt", + "--input-type", + "json", + "--output-type", + "json", + "--gcp-kms", + &KEYRING, + "--encrypted-suffix", + "_sops", + "/dev/stdin", + ]), + input.get().as_bytes(), + ) + .await + .context("failed to run sops")?; + + if !status.success() { + anyhow::bail!( + "decrypting sops document failed: {}", + String::from_utf8_lossy(&stderr), + ); + } + + Ok(models::RawValue::from_string( + std::str::from_utf8(stdout.as_slice()) + .context("failed to parse sops output")? + .to_string(), + )?) +} + +async fn get_shard_info( + task_name: &str, +) -> anyhow::Result { + let flowctl = locate_bin::locate("flowctl").context("failed to locate flowctl")?; + + let async_process::Output { + stderr, + stdout, + status, + } = async_process::output(async_process::Command::new(flowctl).args([ + "raw", + "list-shards", + "--task", + task_name, + "-ojson", + ])) + .await + .context("failed to list shards")?; + + if !status.success() { + anyhow::bail!( + "listing shards failed: {}", + String::from_utf8_lossy(&stderr), + ); + } + + Ok(serde_json::from_slice(&stdout)?) +} + +async fn wait_for_primary(task_name: &str) -> anyhow::Result<()> { + loop { + match get_shard_info(task_name).await { + Err(e) => { + tracing::warn!(?e, "Error getting shard info"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + Ok(info) + if info.status.iter().any(|status| { + status.code() == gazette::consumer::replica_status::Code::Primary + }) => + { + return Ok(()) + } + Ok(info) + if info.status.iter().any(|status| { + status.code() == gazette::consumer::replica_status::Code::Failed + }) => + { + tracing::warn!(statuses = ?info.status, "Shard failed"); + anyhow::bail!("Shard failed"); + } + Ok(info) => { + tracing::info!(statuses = ?info.status,"Waiting for primary"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + } + } + } +} + +async fn send_docs(task_name: &str, path: &str, docs: Vec) -> anyhow::Result<()> { + let shard = get_shard_info(task_name).await?; + + let shard_endpoint = shard + .route + .context("missing shard route")? + .endpoints + .first() + .context("missing shard endpoint")? + .replace("https://", ""); + let shard_labels = shard + .spec + .context("missing shard spec")? + .labels + .context("missing shard labels")? + .labels; + + let hostname = &shard_labels + .iter() + .find(|lab| lab.name == labels::HOSTNAME) + .context("missing HOSTNAME label")? + .value; + let port = &shard_labels + .iter() + .find(|lab| lab.name == labels::EXPOSE_PORT) + .context("missing HOSTNAME label")? + .value; + + let client = reqwest::Client::builder() + .danger_accept_invalid_certs(true) + .build()?; + let url = format!("https://{hostname}-{port}.{shard_endpoint}/{path}"); + + tracing::info!(url, "Sending docs"); + + for doc in docs { + let response = client + .post(url.clone()) + .header("Content-Type", "application/json") + .body(doc.get().to_string()) + .send() + .await + .context("failed to send document")?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + anyhow::bail!("failed to send document: status={}, body={}", status, body); + } + } + + Ok(()) +} + +// Note: For the moment, this is going to use an externally-running `agent`, either from Tilt +// or the one running in prod. That means that any changes to `dekaf::connector` will not get +// tested, as that functionality gets baked into the agent/runtime directly. An improvement +// to these tests would be to additionally run/test against a local build of the latest `agent`. +async fn roundtrip( + name: String, + endpoint_config: DekafConfig, + schema: serde_json::Value, + field_selection: serde_json::Value, + docs: Vec, +) -> anyhow::Result<()> { + // Create test collection with specific schema + // Create test Dekaf materialization + let materialization_config = sops_encrypt(models::RawValue::from_value(&serde_json::to_value( + endpoint_config.clone(), + )?)) + .await?; + + let capture: models::CaptureDef = serde_json::from_value(json!({ + "endpoint": { + "connector": { + "image": "ghcr.io/estuary/source-http-ingest:dev", + "config": sops_encrypt(models::RawValue::from_value(&json!({ + "paths": ["/data"] + }))).await? + } + }, + "bindings": [{ + "resource": { + "path": "/data", + "stream": "/data" + }, + "target": "single_collection" + }] + }))?; + + let collections = HashMap::from_iter( + vec![("single_collection", serde_json::from_value(schema)?)].into_iter(), + ); + + let materialization: models::MaterializationDef = serde_json::from_value(json!({ + "endpoint": { + "dekaf": { + "variant": "foo", + "config": materialization_config + } + }, + "bindings": [ + { + "resource": { + "topic_name": "test_topic" + }, + "source": "single_collection", + "fields": field_selection + } + ] + }))?; + + let task_name_prefix = format!("test/dekaf_testing/{name}"); + + test_specs( + &task_name_prefix, + SpecAction::Delete, + capture.clone(), + collections.clone(), + materialization.clone(), + ) + .await?; + + let (capture_name, materialization_name) = test_specs( + &task_name_prefix, + SpecAction::Create, + capture.clone(), + collections.clone(), + materialization.clone(), + ) + .await?; + + wait_for_primary(&capture_name).await?; + + tracing::info!("Capture is primary"); + + send_docs( + &capture_name, + "data", + docs.iter() + .map(models::RawValue::from_value) + .collect::>(), + ) + .await?; + + tracing::info!("Sent test docs"); + + // Consume test documents + let (consumer, decoder) = + create_consumer(&materialization_name, &endpoint_config.token, "test_topic").await; + + let mut doc_stream = consumer.stream(); + + let mut counter = 0; + while let Some(consumed) = doc_stream.next().await { + let consumed = consumed?; + + // Connfirm that field selection was applied + + let decoded_key = match decoder.decode(consumed.key()).await { + Err(e) => { + tracing::error!(err=?e, "Error decoding key"); + return Err(anyhow::Error::from(e)); + } + Ok(d) => apache_avro::from_value::(&d.value), + }?; + + insta::assert_json_snapshot!(format!("{name}-key-{counter}"), &decoded_key); + + let decoded_payload = match decoder.decode(consumed.payload()).await { + Err(e) => { + tracing::error!(err=?e, "Error decoding value"); + return Err(anyhow::Error::from(e)); + } + Ok(d) => apache_avro::from_value::(&d.value), + }?; + + insta::assert_json_snapshot!(format!("{name}-value-{counter}"), &decoded_payload, { + ".flow_published_at.json" => "[timestamp]", + ".flow_document._flow_extra._meta.json" => "[contains_timestamp]" + }); + + counter += 1; + if counter >= docs.len() { + break; + } + } + + // Delete test specs + + test_specs( + &task_name_prefix, + SpecAction::Delete, + capture.clone(), + collections.clone(), + materialization.clone(), + ) + .await?; + Ok(()) +} + +#[ignore] +#[tokio::test] +#[tracing_test::traced_test] +async fn test_field_selection_specific() -> anyhow::Result<()> { + roundtrip( + "field_selection_specific".to_string(), + dekaf::connector::DekafConfig { + deletions: dekaf::connector::DeletionMode::Kafka, + strict_topic_names: false, + token: "1234".to_string(), + }, + json!({ + "schema": { + "properties": { + "key": { + "type": "string" + }, + "field_a": { + "type": "string", + }, + "field_b": { + "type": "string", + } + }, + "type": "object", + "required": [ + "key", + "field_a", + "field_b" + ], + }, + "key": [ + "/key" + ] + }), + json!({ + "include": { + "field_a": {} + }, + "recommended": false + }), + vec![json!({ + "key": "first", + "field_a": "foo", + "field_b": "bar" + })], + ) + .await +} + +#[ignore] +#[tokio::test] +#[tracing_test::traced_test] +async fn test_field_selection_recommended() -> anyhow::Result<()> { + roundtrip( + "field_selection_recommended".to_string(), + dekaf::connector::DekafConfig { + deletions: dekaf::connector::DeletionMode::Kafka, + strict_topic_names: false, + token: "1234".to_string(), + }, + json!({ + "schema": { + "properties": { + "key": { + "type": "string" + }, + "field_a": { + "type": "string", + }, + "field_b": { + "type": "string", + } + }, + "type": "object", + "required": [ + "key", + "field_a", + "field_b" + ], + }, + "key": [ + "/key" + ] + }), + json!({ + "recommended": true + }), + vec![json!({ + "key": "first", + "field_a": "foo", + "field_b": "bar" + })], + ) + .await +} + +#[ignore] +#[tokio::test] +#[tracing_test::traced_test] +async fn test_field_selection_flow_document() -> anyhow::Result<()> { + roundtrip( + "field_selection_flow_document".to_string(), + dekaf::connector::DekafConfig { + deletions: dekaf::connector::DeletionMode::Kafka, + strict_topic_names: false, + token: "1234".to_string(), + }, + json!({ + "schema": { + "properties": { + "key": { + "type": "string" + }, + "field_a": { + "type": "string", + }, + "field_b": { + "type": "string", + } + }, + "type": "object", + "required": [ + "key", + "field_a", + "field_b" + ], + }, + "key": [ + "/key" + ] + }), + json!({ + "include": { + "key": {}, + "flow_document": {} + }, + "exclude": [ + "field_a", + "field_b" + ], + "recommended": true + }), + vec![json!({ + "key": "first", + "field_a": "foo", + "field_b": "bar" + })], + ) + .await +} + +#[ignore] +#[tokio::test] +#[tracing_test::traced_test] +async fn test_meta_is_deleted() -> anyhow::Result<()> { + roundtrip( + "meta_is_deleted".to_string(), + dekaf::connector::DekafConfig { + deletions: dekaf::connector::DeletionMode::CDC, + strict_topic_names: false, + token: "1234".to_string(), + }, + json!({ + "schema": { + "properties": { + "key": { + "type": "string" + }, + "field_a": { + "type": "string", + }, + "field_b": { + "type": "string", + }, + "_meta": { + "type": "object", + "properties": { + "op": { + "type": "string" + } + } + } + }, + "type": "object", + "required": [ + "key", + ], + }, + "key": [ + "/key" + ] + }), + json!({ + "include": { + "key": {}, + }, + "recommended": false + }), + vec![ + json!({ + "key": "first", + "field_a": "foo", + "field_b": "bar", + "_meta": { + "op": "c" + } + }), + json!({ + "key": "first", + "field_a": "foo", + "field_b": "bar", + "_meta": { + "op": "d" + } + }), + ], + ) + .await +} + +#[tokio::test] +#[tracing_test::traced_test] +async fn test_fields_not_required() -> anyhow::Result<()> { + roundtrip( + "fields_not_required".to_string(), + dekaf::connector::DekafConfig { + deletions: dekaf::connector::DeletionMode::Kafka, + strict_topic_names: false, + token: "1234".to_string(), + }, + json!({ + "schema": { + "properties": { + "key": { + "type": "string" + }, + "field_a": { + "type": "string", + }, + }, + "type": "object", + "required": [ + "key", + ], + }, + "key": [ + "/key" + ] + }), + json!({ + "recommended": true + }), + vec![json!({ + "key": "first", + // Omitting "field_a" + })], + ) + .await +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_flow_document-key-0.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_flow_document-key-0.snap new file mode 100644 index 00000000000..5289da2bdaa --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_flow_document-key-0.snap @@ -0,0 +1,10 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +assertion_line: 435 +expression: "&decoded_key" +--- +{ + "_flow_key": { + "p1": "first" + } +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_flow_document-value-0.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_flow_document-value-0.snap new file mode 100644 index 00000000000..c3016db55dd --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_flow_document-value-0.snap @@ -0,0 +1,20 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +expression: "&decoded_payload" +--- +{ + "flow_document": { + "_flow_extra": { + "_meta": { + "json": "[contains_timestamp]" + } + }, + "field_a": "foo", + "field_b": "bar", + "key": "first" + }, + "flow_published_at": { + "json": "[timestamp]" + }, + "key": "first" +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_recommended-key-0.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_recommended-key-0.snap new file mode 100644 index 00000000000..5289da2bdaa --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_recommended-key-0.snap @@ -0,0 +1,10 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +assertion_line: 435 +expression: "&decoded_key" +--- +{ + "_flow_key": { + "p1": "first" + } +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_recommended-value-0.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_recommended-value-0.snap new file mode 100644 index 00000000000..feeae7afed2 --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_recommended-value-0.snap @@ -0,0 +1,12 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +expression: "&decoded_payload" +--- +{ + "field_a": "foo", + "field_b": "bar", + "flow_published_at": { + "json": "[timestamp]" + }, + "key": "first" +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_specific-key-0.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_specific-key-0.snap new file mode 100644 index 00000000000..5289da2bdaa --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_specific-key-0.snap @@ -0,0 +1,10 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +assertion_line: 435 +expression: "&decoded_key" +--- +{ + "_flow_key": { + "p1": "first" + } +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_specific-value-0.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_specific-value-0.snap new file mode 100644 index 00000000000..db7745db15e --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__field_selection_specific-value-0.snap @@ -0,0 +1,7 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +expression: "&decoded_payload" +--- +{ + "field_a": "foo" +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__fields_not_required-key-0.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__fields_not_required-key-0.snap new file mode 100644 index 00000000000..ebcc37355ea --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__fields_not_required-key-0.snap @@ -0,0 +1,9 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +expression: "&decoded_key" +--- +{ + "_flow_key": { + "p1": "first" + } +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__fields_not_required-value-0.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__fields_not_required-value-0.snap new file mode 100644 index 00000000000..e23574567d9 --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__fields_not_required-value-0.snap @@ -0,0 +1,11 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +expression: "&decoded_payload" +--- +{ + "field_a": null, + "flow_published_at": { + "json": "[timestamp]" + }, + "key": "first" +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-key-0.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-key-0.snap new file mode 100644 index 00000000000..5289da2bdaa --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-key-0.snap @@ -0,0 +1,10 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +assertion_line: 435 +expression: "&decoded_key" +--- +{ + "_flow_key": { + "p1": "first" + } +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-key-1.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-key-1.snap new file mode 100644 index 00000000000..5289da2bdaa --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-key-1.snap @@ -0,0 +1,10 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +assertion_line: 435 +expression: "&decoded_key" +--- +{ + "_flow_key": { + "p1": "first" + } +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-value-0.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-value-0.snap new file mode 100644 index 00000000000..2e2f9d299d8 --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-value-0.snap @@ -0,0 +1,10 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +expression: "&decoded_payload" +--- +{ + "_meta": { + "is_deleted": 0 + }, + "key": "first" +} diff --git a/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-value-1.snap b/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-value-1.snap new file mode 100644 index 00000000000..9c4aaa4aabb --- /dev/null +++ b/crates/dekaf/tests/snapshots/dekaf_integration_test__meta_is_deleted-value-1.snap @@ -0,0 +1,10 @@ +--- +source: crates/dekaf/tests/dekaf_integration_test.rs +expression: "&decoded_payload" +--- +{ + "_meta": { + "is_deleted": 1 + }, + "key": "first" +} diff --git a/supabase/seed.sql b/supabase/seed.sql index 8c3eac91008..2a765737c29 100644 --- a/supabase/seed.sql +++ b/supabase/seed.sql @@ -75,6 +75,17 @@ begin returning id strict into connector_id; insert into public.connector_tags (connector_id, image_tag) values (connector_id, ':dev'); + insert into public.connectors (image_name, title, short_description, logo_url, external_url, recommended) values ( + 'ghcr.io/estuary/source-http-ingest', + json_build_object('en-US','HTTP Webhook'), + json_build_object('en-US','Captures HTTP requests sent by webhooks or other clients'), + json_build_object('en-US','https://storage.googleapis.com/estuary-marketing-strapi-uploads/uploads//61de33_Group_22481_150x150_bdfc5a186d/61de33_Group_22481_150x150_bdfc5a186d.png'), + 'https://estuary.dev', + true + ) + returning id strict into connector_id; + insert into public.connector_tags (connector_id, image_tag) values (connector_id, ':dev'); + insert into public.connectors (image_name, title, short_description, logo_url, external_url, recommended) values ( 'ghcr.io/estuary/source-postgres', json_build_object('en-US','PostgreSQL'),