From e516febb4c25392af0a0ab927c2cdb4bb139ade2 Mon Sep 17 00:00:00 2001 From: Jorge Hermo Date: Thu, 6 Mar 2025 18:38:11 +0100 Subject: [PATCH] feat(postgres sink): Add postgres sink (#21248) * feat(sinks): Initial postgres log sink implementation * feat(sinks): Initial postgres log sink implementation * test(sinks): Add integration test for postgres sink * feat(sinks): postgres sink config unit tests * docs(website): update components cue * chore: update spelling checklist * style: cargo fmt * fix: clippy lint * fix: clippy lint * docs: add changelog * docs: add changelog * chore: rename postgres integration tests feature flag * chore: rename postgres integration tests feature flag * docs: include connection pool reference * fix: fix compilation issue in benches * chore: rename feature flag * feat: update * test: refactored test * test: add new test & store timestamp instead of text * test: add new test & store timestamp instead of text * test: add new test & store timestamp instead of text * test: add reference link * test: add more tests * chore: revert changes in postgresql_metrics integration test * refactor: mv postgres integration test utils in test_utils * feat: allow for metric events * chore: update todo * feat: update * chore: update todo * chore: update example * chore: update todo * chore: update feature gate * chore: update components docs * chore: fix typo in Cargo.toml * test: add test for traces * test: typed span in db * fix: clippy lint * fix: clippy lint * test: add primary key violation test * refactor: rename temp_table function * test: reduce trace struct fields * chore: remove todo * feat: remove panic in postgres request creation * feat: retry for some postgres errors * chore: add docs about postgres connection string * chore: update postgres sink documentation * feat: throttle warn * fix: clippy lints * style: apply format * docs: update generated components docs * fix: spelling * fix: semantic.yml * fix: add period to warn message * chore: update changelog * Update src/sinks/postgres/config.rs Co-authored-by: Pavlos Rontidis * chore: remove todo * chore: remove todo * chore: remove todo * chore: remove todo * chore: remove todo * docs: initial postgres sink documentation * docs: add how it works section * docs: update website docs * chore: update CI spelling & Cargo.lock * dd-rust-license-tool write * docs: change delivery to exactly_once --------- Co-authored-by: Pavlos Rontidis --- .github/actions/spelling/expect.txt | 6 +- .github/workflows/semantic.yml | 1 + Cargo.lock | 254 +++++++++ Cargo.toml | 5 + LICENSE-3rdparty.csv | 7 + changelog.d/15765_postgres_sink.feature.md | 3 + scripts/integration/postgres/test.yaml | 2 + src/sinks/clickhouse/integration_tests.rs | 20 +- src/sinks/databend/integration_tests.rs | 8 +- src/sinks/mod.rs | 2 + src/sinks/postgres/config.rs | 153 ++++++ src/sinks/postgres/integration_tests.rs | 484 ++++++++++++++++++ src/sinks/postgres/mod.rs | 7 + src/sinks/postgres/service.rs | 166 ++++++ src/sinks/postgres/sink.rs | 47 ++ src/sources/postgresql_metrics.rs | 28 +- src/test_util/integration.rs | 28 + src/test_util/mod.rs | 7 + .../reference/configuration/sinks/postgres.md | 14 + website/cue/reference.cue | 3 +- .../components/sinks/base/postgres.cue | 286 +++++++++++ .../reference/components/sinks/postgres.cue | 223 ++++++++ website/cue/reference/services/postgres.cue | 10 + website/cue/reference/urls.cue | 5 + website/data/redirects.yaml | 1 + .../partials/docs/component-under-hero.html | 4 + 26 files changed, 1728 insertions(+), 46 deletions(-) create mode 100644 changelog.d/15765_postgres_sink.feature.md create mode 100644 src/sinks/postgres/config.rs create mode 100644 src/sinks/postgres/integration_tests.rs create mode 100644 src/sinks/postgres/mod.rs create mode 100644 src/sinks/postgres/service.rs create mode 100644 src/sinks/postgres/sink.rs create mode 100644 src/test_util/integration.rs create mode 100644 website/content/en/docs/reference/configuration/sinks/postgres.md create mode 100644 website/cue/reference/components/sinks/base/postgres.cue create mode 100644 website/cue/reference/components/sinks/postgres.cue create mode 100644 website/cue/reference/services/postgres.cue diff --git a/.github/actions/spelling/expect.txt b/.github/actions/spelling/expect.txt index 2753d37cd1d3d..ee745e5c30d5e 100644 --- a/.github/actions/spelling/expect.txt +++ b/.github/actions/spelling/expect.txt @@ -78,7 +78,6 @@ bytesize califrag califragilistic CAROOT -cbor cddl cdylib cef @@ -256,7 +255,6 @@ hostpath hoverable hoverbear httpdeliveryrequestresponse -httpdump httpevent hugepages hugops @@ -467,6 +465,7 @@ rcode rdkafka rdparty readnone +recordset rediss redoctober regexes @@ -541,6 +540,7 @@ spencergilbert spinlock SPOF spog +sqlx srcaddr srcport SREs @@ -592,6 +592,7 @@ threatmanager Throughputs Tiltfile timberio +TIMESTAMPTZ TKEY tlh tmpfs @@ -672,7 +673,6 @@ wtimeout WTS xact xlarge -XMODEM xxs YAMLs YBv diff --git a/.github/workflows/semantic.yml b/.github/workflows/semantic.yml index 175c5d86e67ab..8bee2b21b4ef3 100644 --- a/.github/workflows/semantic.yml +++ b/.github/workflows/semantic.yml @@ -225,6 +225,7 @@ jobs: new_relic_logs sink opentelemetry sink papertrail sink + postgres sink prometheus_exporter sink prometheus_remote_write sink pulsar sink diff --git a/Cargo.lock b/Cargo.lock index cf7183926035d..41e18200dc030 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -761,6 +761,15 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1661,6 +1670,9 @@ name = "bitflags" version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36" +dependencies = [ + "serde", +] [[package]] name = "bitmask-enum" @@ -3157,6 +3169,12 @@ dependencies = [ "tracing 0.1.41", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "duct" version = "0.13.6" @@ -3224,6 +3242,9 @@ name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -3428,6 +3449,17 @@ version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "281e452d3bad4005426416cdba5ccfd4f5c1280e10099e21db27f7c1c28347fc" +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "event-listener" version = "2.5.3" @@ -3784,6 +3816,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -4179,6 +4222,15 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "hdrhistogram" version = "7.5.4" @@ -5552,6 +5604,16 @@ dependencies = [ "libc", ] +[[package]] +name = "libsqlite3-sys" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" +dependencies = [ + "pkg-config", + "vcpkg", +] + [[package]] name = "libz-sys" version = "1.1.12" @@ -9359,6 +9421,197 @@ dependencies = [ "der", ] +[[package]] +name = "sqlx" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4410e73b3c0d8442c5f99b425d7a435b5ee0ae4167b3196771dd3f7a01be745f" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a007b6936676aa9ab40207cde35daab0a04b823be8ae004368c0793b96a61e0" +dependencies = [ + "bytes 1.10.0", + "chrono", + "crc", + "crossbeam-queue", + "either", + "event-listener 5.3.1", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.15.2", + "hashlink", + "indexmap 2.7.1", + "log", + "memchr", + "once_cell", + "percent-encoding", + "serde", + "serde_json", + "sha2", + "smallvec", + "thiserror 2.0.3", + "tokio", + "tokio-stream", + "tracing 0.1.41", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3112e2ad78643fef903618d78cf0aec1cb3134b019730edb039b69eaf531f310" +dependencies = [ + "proc-macro2 1.0.93", + "quote 1.0.38", + "sqlx-core", + "sqlx-macros-core", + "syn 2.0.98", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e9f90acc5ab146a99bf5061a7eb4976b573f560bc898ef3bf8435448dd5e7ad" +dependencies = [ + "dotenvy", + "either", + "heck 0.5.0", + "hex", + "once_cell", + "proc-macro2 1.0.93", + "quote 1.0.38", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "syn 2.0.98", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4560278f0e00ce64938540546f59f590d60beee33fffbd3b9cd47851e5fff233" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.8.0", + "byteorder", + "bytes 1.10.0", + "chrono", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand 0.8.5", + "rsa", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.3", + "tracing 0.1.41", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5b98a57f363ed6764d5b3a12bfedf62f07aa16e1856a7ddc2a0bb190a959613" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.8.0", + "byteorder", + "chrono", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand 0.8.5", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.3", + "tracing 0.1.41", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f85ca71d3a5b24e64e1d08dd8fe36c6c95c339a896cc33068148906784620540" +dependencies = [ + "atoi", + "chrono", + "flume 0.11.0", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "serde_urlencoded", + "sqlx-core", + "tracing 0.1.41", + "url", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -11017,6 +11270,7 @@ dependencies = [ "snafu 0.7.5", "snap", "socket2 0.5.8", + "sqlx", "stream-cancel", "strip-ansi-escapes", "sysinfo", diff --git a/Cargo.toml b/Cargo.toml index 7441d9aa0e08b..c1428a70bd5b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -370,6 +370,7 @@ seahash = { version = "4.1.0", default-features = false } smallvec = { version = "1", default-features = false, features = ["union", "serde"] } snap = { version = "1.1.1", default-features = false } socket2 = { version = "0.5.8", default-features = false } +sqlx = { version = "0.8.3", default-features = false, features = ["derive", "postgres", "chrono", "runtime-tokio"], optional=true } stream-cancel = { version = "0.8.2", default-features = false } strip-ansi-escapes = { version = "0.2.1", default-features = false } syslog = { version = "6.1.1", default-features = false, optional = true } @@ -740,6 +741,7 @@ sinks-logs = [ "sinks-new_relic_logs", "sinks-opentelemetry", "sinks-papertrail", + "sinks-postgres", "sinks-pulsar", "sinks-redis", "sinks-sematext", @@ -809,6 +811,7 @@ sinks-new_relic = [] sinks-opentelemetry = ["sinks-http"] sinks-papertrail = ["dep:syslog"] sinks-prometheus = ["dep:base64", "dep:prost", "vector-lib/prometheus"] +sinks-postgres = ["dep:sqlx"] sinks-pulsar = ["dep:apache-avro", "dep:pulsar", "dep:lru"] sinks-redis = ["dep:redis"] sinks-sematext = ["sinks-elasticsearch", "sinks-influxdb"] @@ -858,6 +861,7 @@ all-integration-tests = [ "nginx-integration-tests", "opentelemetry-integration-tests", "postgresql_metrics-integration-tests", + "postgres_sink-integration-tests", "prometheus-integration-tests", "pulsar-integration-tests", "redis-integration-tests", @@ -922,6 +926,7 @@ nats-integration-tests = ["sinks-nats", "sources-nats"] nginx-integration-tests = ["sources-nginx_metrics"] opentelemetry-integration-tests = ["sources-opentelemetry", "dep:prost"] postgresql_metrics-integration-tests = ["sources-postgresql_metrics"] +postgres_sink-integration-tests = ["sinks-postgres"] prometheus-integration-tests = ["sinks-prometheus", "sources-prometheus", "sinks-influxdb"] pulsar-integration-tests = ["sinks-pulsar", "sources-pulsar"] redis-integration-tests = ["sinks-redis", "sources-redis"] diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 52b29cc6d6d92..dcc150753ab9f 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -46,6 +46,7 @@ async-signal,https://github.com/smol-rs/async-signal,Apache-2.0 OR MIT,John Nunl async-stream,https://github.com/tokio-rs/async-stream,MIT,Carl Lerche async-task,https://github.com/smol-rs/async-task,Apache-2.0 OR MIT,Stjepan Glavina async-trait,https://github.com/dtolnay/async-trait,MIT OR Apache-2.0,David Tolnay +atoi,https://github.com/pacman82/atoi-rs,MIT,Markus Klein atomic-waker,https://github.com/smol-rs/atomic-waker,Apache-2.0 OR MIT,"Stjepan Glavina , Contributors to futures-rs" aws-config,https://github.com/smithy-lang/smithy-rs,Apache-2.0,"AWS Rust SDK Team , Russell Cohen " aws-credential-types,https://github.com/smithy-lang/smithy-rs,Apache-2.0,AWS Rust SDK Team @@ -192,6 +193,7 @@ displaydoc,https://github.com/yaahc/displaydoc,MIT OR Apache-2.0,Jane Lusby doc-comment,https://github.com/GuillaumeGomez/doc-comment,MIT,Guillaume Gomez domain,https://github.com/nlnetlabs/domain,BSD-3-Clause,NLnet Labs +dotenvy,https://github.com/allan2/dotenvy,MIT,"Noemi Lapresta , Craig Hills , Mike Piccolo , Alice Maz , Sean Griffin , Adam Sharp , Arpad Borsos , Allan Zhang " dyn-clone,https://github.com/dtolnay/dyn-clone,MIT OR Apache-2.0,David Tolnay ecdsa,https://github.com/RustCrypto/signatures/tree/master/ecdsa,Apache-2.0 OR MIT,RustCrypto Developers ed25519,https://github.com/RustCrypto/signatures/tree/master/ed25519,Apache-2.0 OR MIT,RustCrypto Developers @@ -210,6 +212,7 @@ erased-serde,https://github.com/dtolnay/erased-serde,MIT OR Apache-2.0,David Tol errno,https://github.com/lambda-fairy/rust-errno,MIT OR Apache-2.0,Chris Wong error-chain,https://github.com/rust-lang-nursery/error-chain,MIT OR Apache-2.0,"Brian Anderson , Paul Colomiets , Colin Kiegel , Yamakaky , Andrew Gauger " error-code,https://github.com/DoumanAsh/error-code,BSL-1.0,Douman +etcetera,https://github.com/lunacookies/etcetera,MIT OR Apache-2.0,The etcetera Authors event-listener,https://github.com/smol-rs/event-listener,Apache-2.0 OR MIT,Stjepan Glavina event-listener,https://github.com/smol-rs/event-listener,Apache-2.0 OR MIT,"Stjepan Glavina , John Nunley " event-listener-strategy,https://github.com/smol-rs/event-listener-strategy,Apache-2.0 OR MIT,John Nunley @@ -240,6 +243,7 @@ futures,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures Au futures-channel,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-channel Authors futures-core,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-core Authors futures-executor,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-executor Authors +futures-intrusive,https://github.com/Matthias247/futures-intrusive,MIT OR Apache-2.0,Matthias Einwag futures-io,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-io Authors futures-lite,https://github.com/smol-rs/futures-lite,Apache-2.0 OR MIT,"Stjepan Glavina , Contributors to futures-rs" futures-macro,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-macro Authors @@ -267,6 +271,7 @@ half,https://github.com/starkat99/half-rs,MIT OR Apache-2.0,Kathryn Long hashbag,https://github.com/jonhoo/hashbag,MIT OR Apache-2.0,Jon Gjengset hashbrown,https://github.com/rust-lang/hashbrown,MIT OR Apache-2.0,Amanieu d'Antras +hashlink,https://github.com/kyren/hashlink,MIT OR Apache-2.0,kyren headers,https://github.com/hyperium/headers,MIT,Sean McArthur heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,The heck Authors heck,https://github.com/withoutboats/heck,MIT OR Apache-2.0,Without Boats @@ -346,6 +351,7 @@ lazy_static,https://github.com/rust-lang-nursery/lazy-static.rs,MIT OR Apache-2. libc,https://github.com/rust-lang/libc,MIT OR Apache-2.0,The Rust Project Developers libflate,https://github.com/sile/libflate,MIT,Takeru Ohta libm,https://github.com/rust-lang/libm,MIT OR Apache-2.0,Jorge Aparicio +libsqlite3-sys,https://github.com/rusqlite/rusqlite,MIT,The rusqlite developers libz-sys,https://github.com/rust-lang/libz-sys,MIT OR Apache-2.0,"Alex Crichton , Josh Triplett , Sebastian Thiel " linked-hash-map,https://github.com/contain-rs/linked-hash-map,MIT OR Apache-2.0,"Stepan Koltsov , Andrew Paseltiner " linked_hash_set,https://github.com/alexheretic/linked-hash-set,Apache-2.0,Alex Butler @@ -591,6 +597,7 @@ spin,https://github.com/mvdnes/spin-rs,MIT,"Mathijs van de Nes , John Ericson , Joshua Barretto " spinning_top,https://github.com/rust-osdev/spinning_top,MIT OR Apache-2.0,Philipp Oppermann spki,https://github.com/RustCrypto/formats/tree/master/spki,Apache-2.0 OR MIT,RustCrypto Developers +sqlx,https://github.com/launchbadge/sqlx,MIT OR Apache-2.0,"Ryan Leckey , Austin Bonander , Chloe Ross , Daniel Akhterov " stable_deref_trait,https://github.com/storyyeller/stable_deref_trait,MIT OR Apache-2.0,Robert Grosse static_assertions,https://github.com/nvzqz/static-assertions-rs,MIT OR Apache-2.0,Nikolai Vazquez static_assertions_next,https://github.com/scuffletv/static-assertions,MIT OR Apache-2.0,Nikolai Vazquez diff --git a/changelog.d/15765_postgres_sink.feature.md b/changelog.d/15765_postgres_sink.feature.md new file mode 100644 index 0000000000000..1e0902378434a --- /dev/null +++ b/changelog.d/15765_postgres_sink.feature.md @@ -0,0 +1,3 @@ +Add a new postgres sink which allows to send log, metric and trace events to a postgres database. + +authors: jorgehermo9 diff --git a/scripts/integration/postgres/test.yaml b/scripts/integration/postgres/test.yaml index 67aa2ddc10b50..a89049f545948 100644 --- a/scripts/integration/postgres/test.yaml +++ b/scripts/integration/postgres/test.yaml @@ -1,5 +1,6 @@ features: - postgresql_metrics-integration-tests +- postgres_sink-integration-tests test_filter: ::postgres @@ -18,6 +19,7 @@ matrix: # expressions are evaluated using https://github.com/micromatch/picomatch paths: - "src/internal_events/postgresql_metrics.rs" +- "src/sinks/postgres/**" - "src/sources/postgresql_metrics.rs" - "src/sources/util/**" - "scripts/integration/postgres/**" diff --git a/src/sinks/clickhouse/integration_tests.rs b/src/sinks/clickhouse/integration_tests.rs index 69af61a768a96..0383db043458f 100644 --- a/src/sinks/clickhouse/integration_tests.rs +++ b/src/sinks/clickhouse/integration_tests.rs @@ -26,7 +26,7 @@ use crate::{ sinks::util::{BatchConfig, Compression, TowerRequestConfig}, test_util::{ components::{run_and_assert_sink_compliance, SINK_TAGS}, - random_string, trace_init, + random_table_name, trace_init, }, }; @@ -38,7 +38,7 @@ fn clickhouse_address() -> String { async fn insert_events() { trace_init(); - let table = gen_table(); + let table = random_table_name(); let host = clickhouse_address(); let mut batch = BatchConfig::default(); @@ -87,7 +87,7 @@ async fn insert_events() { async fn skip_unknown_fields() { trace_init(); - let table = gen_table(); + let table = random_table_name(); let host = clickhouse_address(); let mut batch = BatchConfig::default(); @@ -133,7 +133,7 @@ async fn skip_unknown_fields() { async fn insert_events_unix_timestamps() { trace_init(); - let table = gen_table(); + let table = random_table_name(); let host = clickhouse_address(); let mut batch = BatchConfig::default(); @@ -192,7 +192,7 @@ async fn insert_events_unix_timestamps() { async fn insert_events_unix_timestamps_toml_config() { trace_init(); - let table = gen_table(); + let table = random_table_name(); let host = clickhouse_address(); let config: ClickhouseConfig = toml::from_str(&format!( @@ -250,7 +250,7 @@ timestamp_format = "unix""#, async fn no_retry_on_incorrect_data() { trace_init(); - let table = gen_table(); + let table = random_table_name(); let host = clickhouse_address(); let mut batch = BatchConfig::default(); @@ -309,7 +309,7 @@ async fn no_retry_on_incorrect_data_warp() { let config = ClickhouseConfig { endpoint: host.parse().unwrap(), - table: gen_table().try_into().unwrap(), + table: random_table_name().try_into().unwrap(), batch, ..Default::default() }; @@ -334,7 +334,7 @@ async fn templated_table() { let n_tables = 2; let table_events: Vec<(String, Event, BatchStatusReceiver)> = (0..n_tables) .map(|_| { - let table = gen_table(); + let table = random_table_name(); let (mut event, receiver) = make_event(); event.as_mut_log().insert("table", table.as_str()); (table, event, receiver) @@ -468,7 +468,3 @@ struct Stats { elapsed: f64, rows_read: usize, } - -fn gen_table() -> String { - format!("test_{}", random_string(10).to_lowercase()) -} diff --git a/src/sinks/databend/integration_tests.rs b/src/sinks/databend/integration_tests.rs index 9c050da165600..cfca418d86848 100644 --- a/src/sinks/databend/integration_tests.rs +++ b/src/sinks/databend/integration_tests.rs @@ -13,7 +13,7 @@ use crate::{ sinks::util::UriSerde, test_util::{ components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, - random_string, trace_init, + random_table_name, trace_init, }, }; @@ -24,10 +24,6 @@ fn databend_endpoint() -> String { .unwrap_or_else(|_| "databend://vector:vector@databend:8000?sslmode=disable".into()) } -fn gen_table() -> String { - format!("test_{}", random_string(10).to_lowercase()) -} - fn make_event() -> (Event, BatchStatusReceiver) { let (batch, receiver) = BatchNotifier::new_with_receiver(); let mut event = LogEvent::from("raw log line").with_batch_notifier(&batch); @@ -38,7 +34,7 @@ fn make_event() -> (Event, BatchStatusReceiver) { async fn prepare_config(codec: &str, compression: &str) -> (String, String, DatabendAPIClient) { trace_init(); - let table = gen_table(); + let table = random_table_name(); let endpoint = databend_endpoint(); let _endpoint: UriSerde = endpoint.parse().unwrap(); diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 784f74b1e6005..b5a45a462566e 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -88,6 +88,8 @@ pub mod opendal_common; pub mod opentelemetry; #[cfg(feature = "sinks-papertrail")] pub mod papertrail; +#[cfg(feature = "sinks-postgres")] +pub mod postgres; #[cfg(feature = "sinks-prometheus")] pub mod prometheus; #[cfg(feature = "sinks-pulsar")] diff --git a/src/sinks/postgres/config.rs b/src/sinks/postgres/config.rs new file mode 100644 index 0000000000000..c9ebbb05fc855 --- /dev/null +++ b/src/sinks/postgres/config.rs @@ -0,0 +1,153 @@ +use futures::FutureExt; +use tower::ServiceBuilder; +use vector_lib::{ + config::AcknowledgementsConfig, + configurable::{component::GenerateConfig, configurable_component}, + sink::VectorSink, +}; + +use super::{ + service::{PostgresRetryLogic, PostgresService}, + sink::PostgresSink, +}; +use sqlx::{postgres::PgPoolOptions, Pool, Postgres}; + +use crate::{ + config::{Input, SinkConfig, SinkContext}, + sinks::{ + util::{ + BatchConfig, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt, + TowerRequestConfig, UriSerde, + }, + Healthcheck, + }, +}; + +const fn default_pool_size() -> u32 { + 5 +} + +/// Configuration for the `postgres` sink. +#[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))] +#[derive(Clone, Default, Debug)] +#[serde(deny_unknown_fields)] +pub struct PostgresConfig { + /// The PostgreSQL server connection string. It can contain the username and password. + /// See [PostgreSQL documentation](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING) about connection strings for more information + /// about valid formats and options that can be used. + pub endpoint: String, + + /// The table that data is inserted into. This table parameter is vulnerable + /// to SQL injection attacks as Vector does not validate or sanitize it, you must not use untrusted input. + /// This parameter will be directly interpolated in the SQL query statement, + /// as table names as parameters in prepared statements are not allowed in PostgreSQL. + pub table: String, + + /// The postgres connection pool size. See [this](https://docs.rs/sqlx/latest/sqlx/struct.Pool.html#why-use-a-pool) for more + /// information about why a connection pool should be used. + #[serde(default = "default_pool_size")] + pub pool_size: u32, + + /// Event batching behavior. + /// + /// Note that as PostgreSQL's `jsonb_populate_recordset` function is used to insert events, + /// a single event in the batch can make the whole batch to fail. For example, if a single event within the batch triggers + /// a unique constraint violation in the destination table, the whole event batch will fail. + /// + /// As a workaround, [triggers](https://www.postgresql.org/docs/current/sql-createtrigger.html) on constraint violations + /// can be defined at a database level to change the behavior of the insert operation on specific tables. + /// Alternatively, setting `max_events` batch setting to `1` will make each event to be inserted independently, + /// so events that trigger a constraint violation will not affect the rest of the events. + #[configurable(derived)] + #[serde(default)] + pub batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub request: TowerRequestConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + pub acknowledgements: AcknowledgementsConfig, +} + +impl GenerateConfig for PostgresConfig { + fn generate_config() -> toml::Value { + toml::from_str( + r#"endpoint = "postgres://user:password@localhost/default" + table = "table" + "#, + ) + .unwrap() + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "postgres")] +impl SinkConfig for PostgresConfig { + async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let connection_pool = PgPoolOptions::new() + .max_connections(self.pool_size) + .connect(&self.endpoint) + .await?; + + let healthcheck = healthcheck(connection_pool.clone()).boxed(); + + let batch_settings = self.batch.into_batcher_settings()?; + let request_settings = self.request.into_settings(); + + let endpoint_uri: UriSerde = self.endpoint.parse()?; + let service = PostgresService::new( + connection_pool, + self.table.clone(), + endpoint_uri.to_string(), + ); + let service = ServiceBuilder::new() + .settings(request_settings, PostgresRetryLogic) + .service(service); + + let sink = PostgresSink::new(service, batch_settings); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + Input::all() + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +async fn healthcheck(connection_pool: Pool) -> crate::Result<()> { + sqlx::query("SELECT 1").execute(&connection_pool).await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } + + #[test] + fn parse_config() { + let cfg = toml::from_str::( + r#" + endpoint = "postgres://user:password@localhost/default" + table = "mytable" + "#, + ) + .unwrap(); + assert_eq!(cfg.endpoint, "postgres://user:password@localhost/default"); + assert_eq!(cfg.table, "mytable"); + } +} diff --git a/src/sinks/postgres/integration_tests.rs b/src/sinks/postgres/integration_tests.rs new file mode 100644 index 0000000000000..198132712e300 --- /dev/null +++ b/src/sinks/postgres/integration_tests.rs @@ -0,0 +1,484 @@ +use crate::test_util::integration::postgres::pg_url; +use crate::{ + config::{SinkConfig, SinkContext}, + event::{ObjectMap, TraceEvent, Value}, + sinks::{postgres::PostgresConfig, util::test::load_sink}, + test_util::{ + components::{ + run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS, + }, + random_table_name, trace_init, + }, +}; +use chrono::{DateTime, Utc}; +use futures::stream; +use ordered_float::NotNan; +use serde::{Deserialize, Serialize}; +use sqlx::{Connection, FromRow, PgConnection}; +use std::future::ready; +use vector_lib::event::{ + BatchNotifier, BatchStatus, BatchStatusReceiver, Event, LogEvent, Metric, MetricKind, + MetricValue, +}; +use vrl::event_path; + +const POSTGRES_SINK_TAGS: [&str; 2] = ["endpoint", "protocol"]; + +fn timestamp() -> DateTime { + let timestamp = Utc::now(); + // Postgres does not support nanosecond-resolution, so we truncate the timestamp to microsecond-resolution. + // https://www.postgresql.org/docs/current/datatype-datetime.html + DateTime::from_timestamp_micros(timestamp.timestamp_micros()).unwrap() +} + +fn create_event(id: i64) -> Event { + let mut event = LogEvent::from("raw log line"); + event.insert("id", id); + event.insert("host", "example.com"); + let event_payload = event.clone().into_parts().0; + event.insert("payload", event_payload); + event.insert("timestamp", timestamp()); + event.into() +} + +fn create_event_with_notifier(id: i64) -> (Event, BatchStatusReceiver) { + let (batch, receiver) = BatchNotifier::new_with_receiver(); + let event = create_event(id).with_batch_notifier(&batch); + (event, receiver) +} + +fn create_events(count: usize) -> (Vec, BatchStatusReceiver) { + let mut events = (0..count as i64).map(create_event).collect::>(); + let receiver = BatchNotifier::apply_to(&mut events); + (events, receiver) +} + +fn create_metric(name: &str) -> Metric { + Metric::new( + name, + MetricKind::Absolute, + MetricValue::Counter { value: 1.0 }, + ) + .with_namespace(Some("vector")) + .with_tags(Some(metric_tags!("some_tag" => "some_value"))) + .with_timestamp(Some(timestamp())) +} + +fn create_span(resource: &str) -> ObjectMap { + ObjectMap::from([ + ("service".into(), Value::from("a_service")), + ("name".into(), Value::from("a_name")), + ("resource".into(), Value::from(resource)), + ("type".into(), Value::from("a_type")), + ("trace_id".into(), Value::Integer(123)), + ("span_id".into(), Value::Integer(456)), + ("parent_id".into(), Value::Integer(789)), + ("start".into(), Value::from(timestamp())), + ("duration".into(), Value::Integer(1_000_000)), + ("error".into(), Value::Integer(404)), + ( + "meta".into(), + Value::Object(ObjectMap::from([ + ("foo".into(), Value::from("bar")), + ("bar".into(), Value::from("baz")), + ])), + ), + ( + "metrics".into(), + Value::Object(ObjectMap::from([ + ("a_metric".into(), Value::Float(NotNan::new(0.577).unwrap())), + ("_top_level".into(), Value::Float(NotNan::new(1.0).unwrap())), + ])), + ), + ]) +} + +pub fn create_trace(resource: &str) -> TraceEvent { + let mut t = TraceEvent::default(); + t.insert(event_path!("trace_id"), Value::Integer(123)); + t.insert( + event_path!("spans"), + Value::Array(vec![Value::from(create_span(resource))]), + ); + t +} + +#[derive(Debug, Serialize, Deserialize, FromRow)] +struct TestEvent { + id: i64, + host: String, + timestamp: DateTime, + message: String, + payload: serde_json::Value, +} + +#[derive(Debug, Serialize, Deserialize, FromRow)] +struct TestCounterMetric { + name: String, + namespace: String, + tags: serde_json::Value, + timestamp: DateTime, + kind: String, + counter: serde_json::Value, +} + +#[derive(Debug, Serialize, Deserialize, FromRow)] +struct TestTrace { + trace_id: i64, + spans: Vec, +} + +#[derive(Debug, Serialize, Deserialize, sqlx::Type, FromRow)] +#[sqlx(type_name = "trace_span")] +struct TestTraceSpan { + service: String, + name: String, + resource: String, + r#type: String, + trace_id: i64, + span_id: i64, + parent_id: i64, + start: DateTime, + duration: i64, + error: i64, + meta: serde_json::Value, + metrics: serde_json::Value, +} + +async fn prepare_config() -> (PostgresConfig, String, PgConnection) { + let table = random_table_name(); + let endpoint = pg_url(); + let config_str = format!( + r#" + endpoint = "{endpoint}" + table = "{table}" + batch.max_events = 1 + "#, + ); + let (config, _) = load_sink::(&config_str).unwrap(); + + let connection = PgConnection::connect(endpoint.as_str()) + .await + .expect("Failed to connect to Postgres"); + + (config, table, connection) +} + +#[tokio::test] +async fn healthcheck_passes() { + trace_init(); + let (config, _table, _connection) = prepare_config().await; + let (_sink, healthcheck) = config.build(SinkContext::default()).await.unwrap(); + healthcheck.await.unwrap(); +} + +// This test does not actually fail in the healthcheck query, but in the connection pool creation at +// `PostgresConfig::build` +#[tokio::test] +async fn healthcheck_fails() { + trace_init(); + + let table = random_table_name(); + let endpoint = "postgres://user:pass?host=/unknown_socket_path".to_string(); + let config_str = format!( + r#" + endpoint = "{endpoint}" + table = "{table}" + "#, + ); + let (config, _) = load_sink::(&config_str).unwrap(); + + assert!(config.build(SinkContext::default()).await.is_err()); +} + +#[tokio::test] +async fn insert_single_event() { + trace_init(); + + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = + format!("CREATE TABLE {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)"); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_event, mut receiver) = create_event_with_notifier(0); + let input_log_event = input_event.clone().into_log(); + let expected_value = serde_json::to_value(&input_log_event).unwrap(); + + run_and_assert_sink_compliance(sink, stream::once(ready(input_event)), &POSTGRES_SINK_TAGS) + .await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_log_event); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let select_all_sql = format!("SELECT * FROM {table}"); + let actual_event: TestEvent = sqlx::query_as(&select_all_sql) + .fetch_one(&mut connection) + .await + .unwrap(); + let actual_value = serde_json::to_value(actual_event).unwrap(); + assert_eq!(expected_value, actual_value); +} + +#[tokio::test] +async fn insert_multiple_events() { + trace_init(); + + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = format!( + "CREATE TABLE {table} (id BIGINT, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)" + ); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_events, mut receiver) = create_events(100); + let input_log_events = input_events + .clone() + .into_iter() + .map(Event::into_log) + .collect::>(); + let expected_values = input_log_events + .iter() + .map(|event| serde_json::to_value(event).unwrap()) + .collect::>(); + run_and_assert_sink_compliance(sink, stream::iter(input_events), &POSTGRES_SINK_TAGS).await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_log_events); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let select_all_sql = format!("SELECT * FROM {table} ORDER BY id"); + let actual_events: Vec = sqlx::query_as(&select_all_sql) + .fetch_all(&mut connection) + .await + .unwrap(); + let actual_values = actual_events + .iter() + .map(|event| serde_json::to_value(event).unwrap()) + .collect::>(); + assert_eq!(expected_values, actual_values); +} + +#[tokio::test] +async fn insert_metric() { + trace_init(); + + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = format!( + "CREATE TABLE {table} (name TEXT, namespace TEXT, tags JSONB, timestamp TIMESTAMPTZ, + kind TEXT, counter JSONB)" + ); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let metric = create_metric("counter"); + let expected_metric_value = serde_json::to_value(&metric).unwrap(); + let input_event = Event::from(metric); + + run_and_assert_sink_compliance(sink, stream::once(ready(input_event)), &POSTGRES_SINK_TAGS) + .await; + + let select_all_sql = format!("SELECT * FROM {table}"); + let inserted_metric: TestCounterMetric = sqlx::query_as(&select_all_sql) + .fetch_one(&mut connection) + .await + .unwrap(); + let inserted_metric_value = serde_json::to_value(&inserted_metric).unwrap(); + assert_eq!(inserted_metric_value, expected_metric_value); +} + +#[tokio::test] +async fn insert_trace() { + trace_init(); + + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let drop_type_sql = "DROP TYPE IF EXISTS trace_span CASCADE"; + sqlx::query(drop_type_sql) + .execute(&mut connection) + .await + .unwrap(); + let create_trace_span_type_sql = "CREATE TYPE trace_span AS + (service TEXT, name TEXT, resource TEXT, type TEXT, trace_id BIGINT, + span_id BIGINT, parent_id BIGINT, start TIMESTAMPTZ, duration BIGINT, + error BIGINT, meta JSONB, metrics JSONB)"; + sqlx::query(create_trace_span_type_sql) + .execute(&mut connection) + .await + .unwrap(); + let create_table_sql = format!("CREATE TABLE {table} (trace_id BIGINT, spans trace_span[])"); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let trace = create_trace("a_resource"); + let expected_trace_value = serde_json::to_value(&trace).unwrap(); + let input_event = Event::from(trace); + + run_and_assert_sink_compliance(sink, stream::once(ready(input_event)), &POSTGRES_SINK_TAGS) + .await; + + let select_all_sql = format!("SELECT * FROM {table}"); + let inserted_trace: TestTrace = sqlx::query_as(&select_all_sql) + .fetch_one(&mut connection) + .await + .unwrap(); + let inserted_trace_value = serde_json::to_value(&inserted_trace).unwrap(); + assert_eq!(inserted_trace_value, expected_trace_value); +} + +// Using null::{table} with jsonb_populate_recordset does not work well with default values, +// it is like inserting null values explicitly, it doesn't use table's default values. +#[tokio::test] +async fn default_columns_are_not_populated() { + trace_init(); + + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = format!( + "CREATE TABLE {table} (id BIGINT, not_existing_field TEXT DEFAULT 'default_value')" + ); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_event, mut receiver) = create_event_with_notifier(0); + run_and_assert_sink_compliance( + sink, + stream::once(ready(input_event.clone())), + &POSTGRES_SINK_TAGS, + ) + .await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_event); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let select_all_sql = format!("SELECT not_existing_field FROM {table}"); + let inserted_not_existing_field: (Option,) = sqlx::query_as(&select_all_sql) + .fetch_one(&mut connection) + .await + .unwrap(); + assert_eq!(inserted_not_existing_field.0, None); +} + +#[tokio::test] +async fn extra_fields_are_ignored() { + trace_init(); + + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = format!("CREATE TABLE {table} (message TEXT)"); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_event, mut receiver) = create_event_with_notifier(0); + let input_log_event = input_event.clone().into_log(); + let expected_value = input_log_event + .get_message() + .unwrap() + .as_str() + .unwrap() + .into_owned(); + + run_and_assert_sink_compliance(sink, stream::once(ready(input_event)), &POSTGRES_SINK_TAGS) + .await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_log_event); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); + + let select_all_sql = format!("SELECT * FROM {table}"); + let actual_value: (String,) = sqlx::query_as(&select_all_sql) + .fetch_one(&mut connection) + .await + .unwrap(); + assert_eq!(expected_value, actual_value.0); +} + +#[tokio::test] +async fn insertion_fails_required_field_is_not_present() { + trace_init(); + + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = + format!("CREATE TABLE {table} (message TEXT, not_existing_field TEXT NOT NULL)"); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let (input_event, mut receiver) = create_event_with_notifier(0); + + run_and_assert_sink_error( + sink, + stream::once(ready(input_event.clone())), + &COMPONENT_ERROR_TAGS, + ) + .await; + // We drop the event to notify the receiver that the batch was delivered. + std::mem::drop(input_event); + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); + + // We ensure that the event was not inserted. + let select_all_sql = format!("SELECT * FROM {table}"); + let first_row: Option<(String, String)> = sqlx::query_as(&select_all_sql) + .fetch_optional(&mut connection) + .await + .unwrap(); + assert_eq!(first_row, None); +} + +#[tokio::test] +async fn insertion_fails_missing_table() { + trace_init(); + + let table = "missing_table".to_string(); + let (mut config, _, _) = prepare_config().await; + config.table = table.clone(); + + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let (input_event, mut receiver) = create_event_with_notifier(0); + + run_and_assert_sink_error( + sink, + stream::once(ready(input_event)), + &COMPONENT_ERROR_TAGS, + ) + .await; + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected)); +} + +#[tokio::test] +async fn insertion_fails_primary_key_violation() { + trace_init(); + + let (config, table, mut connection) = prepare_config().await; + let (sink, _hc) = config.build(SinkContext::default()).await.unwrap(); + let create_table_sql = + format!("CREATE TABLE {table} (id BIGINT PRIMARY KEY, host TEXT, timestamp TIMESTAMPTZ, message TEXT, payload JSONB)"); + sqlx::query(&create_table_sql) + .execute(&mut connection) + .await + .unwrap(); + + let event = create_event(0); + run_and_assert_sink_error( + sink, + // We send the same event twice to trigger a primary key violation on column `id`. + stream::iter(vec![event.clone(), event]), + &COMPONENT_ERROR_TAGS, + ) + .await; +} diff --git a/src/sinks/postgres/mod.rs b/src/sinks/postgres/mod.rs new file mode 100644 index 0000000000000..ffbe6d9a4b071 --- /dev/null +++ b/src/sinks/postgres/mod.rs @@ -0,0 +1,7 @@ +mod config; +#[cfg(all(test, feature = "postgres_sink-integration-tests"))] +mod integration_tests; +mod service; +mod sink; + +pub use self::config::PostgresConfig; diff --git a/src/sinks/postgres/service.rs b/src/sinks/postgres/service.rs new file mode 100644 index 0000000000000..d1edcdce16505 --- /dev/null +++ b/src/sinks/postgres/service.rs @@ -0,0 +1,166 @@ +use std::num::NonZeroUsize; +use std::task::{Context, Poll}; + +use crate::internal_events::EndpointBytesSent; +use crate::sinks::prelude::{RequestMetadataBuilder, RetryLogic}; +use futures::future::BoxFuture; +use snafu::{ResultExt, Snafu}; +use sqlx::types::Json; +use sqlx::{Pool, Postgres}; +use tower::Service; +use vector_lib::codecs::JsonSerializerConfig; +use vector_lib::event::{Event, EventFinalizers, EventStatus, Finalizable}; +use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata}; +use vector_lib::stream::DriverResponse; +use vector_lib::EstimatedJsonEncodedSizeOf; + +const POSTGRES_PROTOCOL: &str = "postgres"; + +#[derive(Clone)] +pub struct PostgresRetryLogic; + +impl RetryLogic for PostgresRetryLogic { + type Error = PostgresServiceError; + type Response = PostgresResponse; + + fn is_retriable_error(&self, error: &Self::Error) -> bool { + let PostgresServiceError::Postgres { + source: postgres_error, + } = error + else { + return false; + }; + + matches!( + postgres_error, + sqlx::Error::Io(_) | sqlx::Error::PoolTimedOut + ) + } +} + +#[derive(Clone)] +pub struct PostgresService { + connection_pool: Pool, + table: String, + endpoint: String, +} + +impl PostgresService { + pub const fn new(connection_pool: Pool, table: String, endpoint: String) -> Self { + Self { + connection_pool, + table, + endpoint, + } + } +} + +#[derive(Clone)] +pub struct PostgresRequest { + pub events: Vec, + pub finalizers: EventFinalizers, + pub metadata: RequestMetadata, +} + +impl TryFrom> for PostgresRequest { + type Error = String; + + fn try_from(mut events: Vec) -> Result { + let finalizers = events.take_finalizers(); + let metadata_builder = RequestMetadataBuilder::from_events(&events); + let events_size = NonZeroUsize::new(events.estimated_json_encoded_size_of().get()) + .ok_or("payload should never be zero length")?; + let metadata = metadata_builder.with_request_size(events_size); + Ok(PostgresRequest { + events, + finalizers, + metadata, + }) + } +} + +impl Finalizable for PostgresRequest { + fn take_finalizers(&mut self) -> EventFinalizers { + self.finalizers.take_finalizers() + } +} + +impl MetaDescriptive for PostgresRequest { + fn get_metadata(&self) -> &RequestMetadata { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut RequestMetadata { + &mut self.metadata + } +} + +pub struct PostgresResponse { + metadata: RequestMetadata, +} + +impl DriverResponse for PostgresResponse { + fn event_status(&self) -> EventStatus { + EventStatus::Delivered + } + + fn events_sent(&self) -> &GroupedCountByteSize { + self.metadata.events_estimated_json_encoded_byte_size() + } + + fn bytes_sent(&self) -> Option { + Some(self.metadata.request_encoded_size()) + } +} + +#[derive(Debug, Snafu)] +pub enum PostgresServiceError { + #[snafu(display("Database error: {source}"))] + Postgres { source: sqlx::Error }, + + #[snafu(display("Serialization error: {source}"))] + VectorCommon { source: vector_common::Error }, +} + +impl Service for PostgresService { + type Response = PostgresResponse; + type Error = PostgresServiceError; + type Future = BoxFuture<'static, Result>; + + fn poll_ready(&mut self, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: PostgresRequest) -> Self::Future { + let service = self.clone(); + let future = async move { + let table = service.table; + let metadata = request.metadata; + let json_serializer = JsonSerializerConfig::default().build(); + let serialized_values = request + .events + .into_iter() + .map(|event| json_serializer.to_json_value(event)) + .collect::, _>>() + .context(VectorCommonSnafu)?; + + sqlx::query(&format!( + "INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)" + )) + .bind(Json(serialized_values)) + .execute(&service.connection_pool) + .await + .context(PostgresSnafu)?; + + emit!(EndpointBytesSent { + byte_size: metadata.request_encoded_size(), + protocol: POSTGRES_PROTOCOL, + endpoint: &service.endpoint, + }); + + Ok(PostgresResponse { metadata }) + }; + + Box::pin(future) + } +} diff --git a/src/sinks/postgres/sink.rs b/src/sinks/postgres/sink.rs new file mode 100644 index 0000000000000..5b819e3c5d515 --- /dev/null +++ b/src/sinks/postgres/sink.rs @@ -0,0 +1,47 @@ +use super::service::{PostgresRequest, PostgresRetryLogic, PostgresService}; +use crate::sinks::prelude::*; + +pub struct PostgresSink { + service: Svc, + batch_settings: BatcherSettings, +} + +impl PostgresSink { + pub const fn new( + service: Svc, + batch_settings: BatcherSettings, + ) -> Self { + Self { + service, + batch_settings, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + .batched(self.batch_settings.as_byte_size_config()) + .filter_map(|events| async move { + match PostgresRequest::try_from(events) { + Ok(request) => Some(request), + Err(e) => { + warn!( + message = "Error creating postgres sink's request.", + error = %e, + internal_log_rate_limit=true + ); + None + } + } + }) + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for PostgresSink { + async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sources/postgresql_metrics.rs b/src/sources/postgresql_metrics.rs index f4a6f3f4e6418..675dd9e086288 100644 --- a/src/sources/postgresql_metrics.rs +++ b/src/sources/postgresql_metrics.rs @@ -995,36 +995,16 @@ mod tests { #[cfg(all(test, feature = "postgresql_metrics-integration-tests"))] mod integration_tests { - use std::path::PathBuf; - use super::*; use crate::{ event::Event, - test_util::components::{assert_source_compliance, PULL_SOURCE_TAGS}, + test_util::{ + components::{assert_source_compliance, PULL_SOURCE_TAGS}, + integration::postgres::{pg_socket, pg_url}, + }, tls, SourceSender, }; - fn pg_host() -> String { - std::env::var("PG_HOST").unwrap_or_else(|_| "localhost".into()) - } - - fn pg_socket() -> PathBuf { - std::env::var("PG_SOCKET") - .map(PathBuf::from) - .unwrap_or_else(|_| { - let current_dir = std::env::current_dir().unwrap(); - current_dir - .join("tests") - .join("data") - .join("postgresql-local-socket") - }) - } - - fn pg_url() -> String { - std::env::var("PG_URL") - .unwrap_or_else(|_| format!("postgres://vector:vector@{}/postgres", pg_host())) - } - async fn test_postgresql_metrics( endpoint: String, tls: Option, diff --git a/src/test_util/integration.rs b/src/test_util/integration.rs new file mode 100644 index 0000000000000..106ae44aa5f82 --- /dev/null +++ b/src/test_util/integration.rs @@ -0,0 +1,28 @@ +#[cfg(any( + feature = "postgres_sink-integration-tests", + feature = "postgresql_metrics-integration-tests" +))] +pub mod postgres { + use std::path::PathBuf; + + pub fn pg_host() -> String { + std::env::var("PG_HOST").unwrap_or_else(|_| "localhost".into()) + } + + pub fn pg_socket() -> PathBuf { + std::env::var("PG_SOCKET") + .map(PathBuf::from) + .unwrap_or_else(|_| { + let current_dir = std::env::current_dir().unwrap(); + current_dir + .join("tests") + .join("data") + .join("postgresql-local-socket") + }) + } + + pub fn pg_url() -> String { + std::env::var("PG_URL") + .unwrap_or_else(|_| format!("postgres://vector:vector@{}/postgres", pg_host())) + } +} diff --git a/src/test_util/mod.rs b/src/test_util/mod.rs index 9917da82b89d6..e446b5455d9d4 100644 --- a/src/test_util/mod.rs +++ b/src/test_util/mod.rs @@ -68,6 +68,9 @@ pub mod mock; pub mod compression; pub mod stats; +#[cfg(test)] +pub mod integration; + #[macro_export] macro_rules! assert_downcast_matches { ($e:expr, $t:ty, $v:pat) => {{ @@ -232,6 +235,10 @@ pub fn temp_dir() -> PathBuf { path.join(dir_name) } +pub fn random_table_name() -> String { + format!("test_{}", random_string(10).to_lowercase()) +} + pub fn map_event_batch_stream( stream: impl Stream, batch: Option, diff --git a/website/content/en/docs/reference/configuration/sinks/postgres.md b/website/content/en/docs/reference/configuration/sinks/postgres.md new file mode 100644 index 0000000000000..7e7f2c59fd9b9 --- /dev/null +++ b/website/content/en/docs/reference/configuration/sinks/postgres.md @@ -0,0 +1,14 @@ +--- +title: Postgres +description: Deliver observability data to the [PostgreSQL](https://www.postgresql.org/) database +component_kind: sink +layout: component +tags: ["postgres", "component", "sink", "storage", "logs", "metrics", "traces"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} diff --git a/website/cue/reference.cue b/website/cue/reference.cue index 78f6f686a8102..c55461f810916 100644 --- a/website/cue/reference.cue +++ b/website/cue/reference.cue @@ -31,9 +31,10 @@ _values: { // // * `at_least_once` - The event will be delivered at least once and // could be delivered more than once. +// * `exactly_once` - The event will be delivered exactly once. // * `best_effort` - We will make a best effort to deliver the event, // but the event is not guaranteed to be delivered. -#DeliveryStatus: "at_least_once" | "best_effort" +#DeliveryStatus: "at_least_once" | "exactly_once" | "best_effort" // `#DeploymentRoles` clarify when a component should be used under // certain deployment contexts. diff --git a/website/cue/reference/components/sinks/base/postgres.cue b/website/cue/reference/components/sinks/base/postgres.cue new file mode 100644 index 0000000000000..bc20d87ae14f2 --- /dev/null +++ b/website/cue/reference/components/sinks/base/postgres.cue @@ -0,0 +1,286 @@ +package metadata + +base: components: sinks: postgres: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source that supports end-to-end + acknowledgements that is connected to that sink waits for events + to be acknowledged by **all connected sinks** before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + batch: { + description: """ + Event batching behavior. + + Note that as PostgreSQL's `jsonb_populate_recordset` function is used to insert events, + a single event in the batch can make the whole batch to fail. For example, if a single event within the batch triggers + a unique constraint violation in the destination table, the whole event batch will fail. + + As a workaround, [triggers](https://www.postgresql.org/docs/current/sql-createtrigger.html) on constraint violations + can be defined at a database level to change the behavior of the insert operation on specific tables. + Alternatively, setting `max_events` batch setting to `1` will make each event to be inserted independently, + so events that trigger a constraint violation will not affect the rest of the events. + """ + required: false + type: object: options: { + max_bytes: { + description: """ + The maximum size of a batch that is processed by a sink. + + This is based on the uncompressed size of the batched events, before they are + serialized/compressed. + """ + required: false + type: uint: { + default: 10000000 + unit: "bytes" + } + } + max_events: { + description: "The maximum size of a batch before it is flushed." + required: false + type: uint: unit: "events" + } + timeout_secs: { + description: "The maximum age of a batch before it is flushed." + required: false + type: float: { + default: 1.0 + unit: "seconds" + } + } + } + } + endpoint: { + description: """ + The PostgreSQL server connection string. It can contain the username and password. + See [PostgreSQL documentation](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING) about connection strings for more information + about valid formats and options that can be used. + """ + required: true + type: string: {} + } + pool_size: { + description: """ + The postgres connection pool size. See [this](https://docs.rs/sqlx/latest/sqlx/struct.Pool.html#why-use-a-pool) for more + information about why a connection pool should be used. + """ + required: false + type: uint: default: 5 + } + request: { + description: """ + Middleware settings for outbound requests. + + Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior. + + Note that the retry backoff policy follows the Fibonacci sequence. + """ + required: false + type: object: options: { + adaptive_concurrency: { + description: """ + Configuration of adaptive concurrency parameters. + + These parameters typically do not require changes from the default, and incorrect values can lead to meta-stable or + unstable performance and sink behavior. Proceed with caution. + """ + required: false + type: object: options: { + decrease_ratio: { + description: """ + The fraction of the current value to set the new concurrency limit when decreasing the limit. + + Valid values are greater than `0` and less than `1`. Smaller values cause the algorithm to scale back rapidly + when latency increases. + + **Note**: The new limit is rounded down after applying this ratio. + """ + required: false + type: float: default: 0.9 + } + ewma_alpha: { + description: """ + The weighting of new measurements compared to older measurements. + + Valid values are greater than `0` and less than `1`. + + ARC uses an exponentially weighted moving average (EWMA) of past RTT measurements as a reference to compare with + the current RTT. Smaller values cause this reference to adjust more slowly, which may be useful if a service has + unusually high response variability. + """ + required: false + type: float: default: 0.4 + } + initial_concurrency: { + description: """ + The initial concurrency limit to use. If not specified, the initial limit is 1 (no concurrency). + + Datadog recommends setting this value to your service's average limit if you're seeing that it takes a + long time to ramp up adaptive concurrency after a restart. You can find this value by looking at the + `adaptive_concurrency_limit` metric. + """ + required: false + type: uint: default: 1 + } + max_concurrency_limit: { + description: """ + The maximum concurrency limit. + + The adaptive request concurrency limit does not go above this bound. This is put in place as a safeguard. + """ + required: false + type: uint: default: 200 + } + rtt_deviation_scale: { + description: """ + Scale of RTT deviations which are not considered anomalous. + + Valid values are greater than or equal to `0`, and we expect reasonable values to range from `1.0` to `3.0`. + + When calculating the past RTT average, we also compute a secondary “deviation” value that indicates how variable + those values are. We use that deviation when comparing the past RTT average to the current measurements, so we + can ignore increases in RTT that are within an expected range. This factor is used to scale up the deviation to + an appropriate range. Larger values cause the algorithm to ignore larger increases in the RTT. + """ + required: false + type: float: default: 2.5 + } + } + } + concurrency: { + description: """ + Configuration for outbound request concurrency. + + This can be set either to one of the below enum values or to a positive integer, which denotes + a fixed concurrency limit. + """ + required: false + type: { + string: { + default: "adaptive" + enum: { + adaptive: """ + Concurrency is managed by Vector's [Adaptive Request Concurrency][arc] feature. + + [arc]: https://vector.dev/docs/about/under-the-hood/networking/arc/ + """ + none: """ + A fixed concurrency of 1. + + Only one request can be outstanding at any given time. + """ + } + } + uint: {} + } + } + rate_limit_duration_secs: { + description: "The time window used for the `rate_limit_num` option." + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + rate_limit_num: { + description: "The maximum number of requests allowed within the `rate_limit_duration_secs` time window." + required: false + type: uint: { + default: 9223372036854775807 + unit: "requests" + } + } + retry_attempts: { + description: "The maximum number of retries to make for failed requests." + required: false + type: uint: { + default: 9223372036854775807 + unit: "retries" + } + } + retry_initial_backoff_secs: { + description: """ + The amount of time to wait before attempting the first retry for a failed request. + + After the first retry has failed, the fibonacci sequence is used to select future backoffs. + """ + required: false + type: uint: { + default: 1 + unit: "seconds" + } + } + retry_jitter_mode: { + description: "The jitter mode to use for retry backoff behavior." + required: false + type: string: { + default: "Full" + enum: { + Full: """ + Full jitter. + + The random delay is anywhere from 0 up to the maximum current delay calculated by the backoff + strategy. + + Incorporating full jitter into your backoff strategy can greatly reduce the likelihood + of creating accidental denial of service (DoS) conditions against your own systems when + many clients are recovering from a failure state. + """ + None: "No jitter." + } + } + } + retry_max_duration_secs: { + description: "The maximum amount of time to wait between retries." + required: false + type: uint: { + default: 30 + unit: "seconds" + } + } + timeout_secs: { + description: """ + The time a request can take before being aborted. + + Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could + create orphaned requests, pile on retries, and result in duplicate data downstream. + """ + required: false + type: uint: { + default: 60 + unit: "seconds" + } + } + } + } + table: { + description: """ + The table that data is inserted into. This table parameter is vulnerable + to SQL injection attacks as Vector does not validate or sanitize it, you must not use untrusted input. + This parameter will be directly interpolated in the SQL query statement, + as table names as parameters in prepared statements are not allowed in PostgreSQL. + """ + required: true + type: string: {} + } +} diff --git a/website/cue/reference/components/sinks/postgres.cue b/website/cue/reference/components/sinks/postgres.cue new file mode 100644 index 0000000000000..a7e8f84d62b59 --- /dev/null +++ b/website/cue/reference/components/sinks/postgres.cue @@ -0,0 +1,223 @@ +package metadata + +components: sinks: postgres: { + title: "PostgreSQL" + + classes: { + commonly_used: false + delivery: "exactly_once" + development: "beta" + egress_method: "batch" + stateful: false + } + + features: { + acknowledgements: true + auto_generated: true + healthcheck: enabled: true + send: { + batch: { + enabled: true + common: false + max_bytes: 10_000_000 + timeout_secs: 1.0 + } + request: { + enabled: true + headers: false + } + compression: enabled: false + encoding: enabled: false + tls: enabled: false + to: { + service: services.postgres + interface: { + socket: { + direction: "outgoing" + protocols: ["tcp", "unix"] + ssl: "optional" + } + } + } + } + } + + support: { + requirements: [] + warnings: [ + """ + [PostgreSQL's default values](\(urls.postgresql_default_values)) defined in the destination table + are not supported. If the ingested event is missing a field which is present as a table column, + a `null` value will be inserted for that record even if that column has a default value defined. + This is a limitation of the `jsonb_populate_recordset` function of PostgreSQL. + + As a workaround, you can add a `NOT NULL` constraint to the column, so when inserting an event which is missing that field + a `NOT NULL` constraint violation would be raised, and define a [constraint trigger](\(urls.postgresql_constraint_trigger)) + to catch the exception and set the desired default value. + """, + ] + notices: [] + } + + configuration: base.components.sinks.postgres.configuration + + input: { + logs: true + metrics: { + counter: true + distribution: true + gauge: true + histogram: true + set: true + summary: true + } + traces: true + } + + how_it_works: { + inserting_events_into_postgres: { + title: "Inserting events into PostgreSQL" + body: """ + + In order to insert data into a PostgreSQL table, you must first create a table that matches + the json serialization of your event data. Note that this sink accepts `log`, `metric`, and `trace` events + and the inserting behavior will be the same for all of them. + + For example, if your event is a log whose JSON serialization would have the following structure: + ```json + { + "host": "localhost", + "message": "239.215.85.26 - AmbientTech [04/Mar/2025:15:09:25 +0100] \"DELETE /observability/metrics/production HTTP/1.0\" 300 37142", + "service": "vector", + "source_type": "demo_logs", + "timestamp": "2025-03-04T14:09:25.883572054Z" + } + ``` + And you want to store all those fields, the table should be created as follows: + ```sql + CREATE TABLE logs ( + host TEXT, + message TEXT, + service TEXT, + source_type TEXT, + timestamp TIMESTAMPTZ + ); + ``` + Note that not all fields must be declared in the table, only the ones you want to store. If a field is not present in the table + but it is present in the event, it will be ignored. + + When inserting the event into the table, PostgreSQL will do a best-effort job of converting the JSON serialized + event to the correct PostgreSQL data types. + The semantics of the insertion will follow the `jsonb_populate_record` function of PostgresSQL, + see [PostgreSQL documentation](\(urls.postgresql_json_functions)) about that function + for more details about the inserting behavior. + The correspondence between Vector types and PostgreSQL types can be found + in the [`sqlx` crate's documentation](\(urls.postgresql_sqlx_correspondence)) + + #### Practical example + + Spin up a PostgreSQL instance with Docker: + ```shell + docker run -d --name postgres -e POSTGRES_PASSWORD=password123 -p 5432:5432 postgres + ``` + + Create the following PostgreSQL table inside the `test` database: + ```sql + CREATE TABLE logs ( + message TEXT, + payload JSONB, + timestamp TIMESTAMPTZ + ); + ``` + + And the following Vector configuration: + ```yaml + sources: + demo_logs: + type: demo_logs + format: apache_common + transforms: + payload: + type: remap + inputs: + - demo_logs + source: | + .payload = . + sinks: + postgres: + type: postgres + inputs: + - payload + endpoint: postgres://postgres:password123@localhost/test + table: logs + ``` + Then, you can see those log events ingested in the `logs` table. + + #### Composite Types + + When using PostgreSQL [composite types](\(urls.postgresql_composite_types)), the sink will attempt to insert the event data into + the composite type, following its structure. + + Using the previous example, if you want to store the `payload` column as a composite type instead of `JSONB`, + you should create the following composite type: + ```sql + CREATE TYPE payload_type AS ( + host TEXT, + message TEXT, + service TEXT, + source_type TEXT, + timestamp TIMESTAMPTZ + ); + ``` + + And the table should be created as follows: + ```sql + CREATE TABLE logs ( + message TEXT, + payload payload_type, + timestamp TIMESTAMPTZ + ); + ``` + + Then, you can see those log events ingested in the `logs` table and the `payload` column can be + treated as a regular PostgreSQL composite type. + + #### Ingesting metrics + + When ingesting metrics, the sink will behave exactly the same as when ingesting logs. You must declare + the table with the same fields as the JSON serialization of the metric event. + + For example, in order to ingest Vector's internal events, and only take into account `counter`, `gauge`, and `aggregated_histogram` metric data, + you should create the following table: + + ```sql + create table metrics( + name TEXT, + namespace TEXT, + tags JSONB, + timestamp TIMESTAMPTZ, + kind TEXT, + counter JSONB, + gauge JSONB, + aggregated_histogram JSONB + ); + ``` + + And with this Vector configuration: + ```yaml + sources: + internal_metrics: + type: internal_metrics + sinks: + postgres: + type: postgres + inputs: + - internal_metrics + endpoint: postgres://postgres:password123@localhost/test + table: metrics + ``` + You can see those metric events ingested into the `metrics` table. + """ + } + } +} diff --git a/website/cue/reference/services/postgres.cue b/website/cue/reference/services/postgres.cue new file mode 100644 index 0000000000000..5868b0d605c26 --- /dev/null +++ b/website/cue/reference/services/postgres.cue @@ -0,0 +1,10 @@ +package metadata + +services: postgres: { + name: "Postgres" + thing: "a \(name) database" + url: urls.postgresql + versions: null + + description: "[PostgreSQL](\(urls.postgresql)) is a powerful, open source object-relational database system that uses and extends the SQL language combined with many features that safely store and scale the most complicated data workloads." +} diff --git a/website/cue/reference/urls.cue b/website/cue/reference/urls.cue index c442f1987a1f0..53878146327e8 100644 --- a/website/cue/reference/urls.cue +++ b/website/cue/reference/urls.cue @@ -412,8 +412,13 @@ urls: { percent_encoding_www_form_urlencoded: "https://url.spec.whatwg.org/#application-x-www-form-urlencoded-percent-encode-set" posix_acls: "https://www.usenix.org/legacy/publications/library/proceedings/usenix03/tech/freenix03/full_papers/gruenbacher/gruenbacher_html/main.html" postgresql: "https://www.postgresql.org/" + postgresql_composite_types: "https://www.postgresql.org/docs/current/rowtypes.html" + postgresql_constraint_trigger: "https://www.postgresql.org/docs/9.0/sql-createconstraint.html" postgresql_csvlog: "https://www.postgresql.org/docs/current/runtime-config-logging.html#RUNTIME-CONFIG-LOGGING-CSVLOG" + postgresql_default_values: "https://www.postgresql.org/docs/current/ddl-default.html" + postgresql_json_functions: "https://www.postgresql.org/docs/17/functions-json.html" postgresql_matching: "https://www.postgresql.org/docs/current/functions-matching.html#FUNCTIONS-POSIX-REGEXP" + postgresql_sqlx_correspondence: "https://docs.rs/sqlx/latest/sqlx/postgres/types/index.html" procfs: "https://en.wikipedia.org/wiki/Procfs#:~:text=The%20proc%20filesystem%20(procfs)%20is,in%20the%20kernel%20than%20traditional" prometheus: "https://prometheus.io/" prometheus_client: "https://prometheus.io/docs/instrumenting/clientlibs/" diff --git a/website/data/redirects.yaml b/website/data/redirects.yaml index e9e502526a460..4e7a94f467b56 100644 --- a/website/data/redirects.yaml +++ b/website/data/redirects.yaml @@ -35,6 +35,7 @@ sinks: - new_relic_logs - new_relic - papertrail +- postgres - prometheus_exporter - prometheus_remote_write - pulsar diff --git a/website/layouts/partials/docs/component-under-hero.html b/website/layouts/partials/docs/component-under-hero.html index 90ede3e69a784..2d0f8331747e0 100644 --- a/website/layouts/partials/docs/component-under-hero.html +++ b/website/layouts/partials/docs/component-under-hero.html @@ -26,6 +26,10 @@ {{ if eq $classes.delivery "at_least_once" }} {{ partial "badge.html" (dict "prefix" "delivery" "word" "at-least-once" "color" "blue" "inline" true) }} {{ end }} + + {{ if eq $classes.delivery "exactly_once" }} + {{ partial "badge.html" (dict "prefix" "delivery" "word" "exactly-once" "color" "blue" "inline" true) }} + {{ end }} {{ if eq $features.acknowledgements true }} {{ partial "badge.html" (dict "prefix" "acknowledgements" "word" "yes" "color" "green" "inline" true) }}