From 01118c5623d7f7cffd3d50e88a024cea2c1cabc0 Mon Sep 17 00:00:00 2001 From: Konstantin Shabanov Date: Sun, 25 Sep 2022 21:51:00 +0600 Subject: [PATCH 1/2] feat(examples): Expand rust-outbound-pg example Uncomment write fn and map it to the `/write` endpoint. Also, add basic README. Signed-off-by: Konstantin Shabanov --- examples/rust-outbound-pg/README.md | 49 +++++++++++++++++++++++ examples/rust-outbound-pg/db/testdata.sql | 21 +++++++--- examples/rust-outbound-pg/spin.toml | 6 +-- examples/rust-outbound-pg/src/lib.rs | 41 ++++++++++++++++--- 4 files changed, 101 insertions(+), 16 deletions(-) create mode 100644 examples/rust-outbound-pg/README.md diff --git a/examples/rust-outbound-pg/README.md b/examples/rust-outbound-pg/README.md new file mode 100644 index 0000000000..bdbb3034e9 --- /dev/null +++ b/examples/rust-outbound-pg/README.md @@ -0,0 +1,49 @@ +# Spin Outbound PostgreSQL example + +This example shows how to access a PostgreSQL database from Spin component. + +## Spin up + +From example root: + +``` +createdb spin_dev +psql -d spin_dev -f db/testdata.sql +RUST_LOG=spin=trace spin build --up +``` + +Curl the read route: + +``` +$ curl -i localhost:3000/read +HTTP/1.1 200 OK +content-length: 501 +date: Sun, 25 Sep 2022 15:45:02 GMT + +Found 2 article(s) as follows: +article: Article { + id: 1, + title: "My Life as a Goat", + content: "I went to Nepal to live as a goat, and it was much better than being a butler.", + authorname: "E. Blackadder", +} +article: Article { + id: 2, + title: "Magnificent Octopus", + content: "Once upon a time there was a lovely little sausage.", + authorname: "S. Baldrick", +} + +(Column info: id:DbDataType::Int32, title:DbDataType::Str, content:DbDataType::Str, authorname:DbDataType::Str) +``` + +Curl the write route: + +``` +$ curl -i localhost:3000/write +HTTP/1.1 200 OK +content-length: 9 +date: Sun, 25 Sep 2022 15:46:22 GMT + +Count: 3 +``` diff --git a/examples/rust-outbound-pg/db/testdata.sql b/examples/rust-outbound-pg/db/testdata.sql index f7d3aca133..1537bfe7e6 100644 --- a/examples/rust-outbound-pg/db/testdata.sql +++ b/examples/rust-outbound-pg/db/testdata.sql @@ -1,9 +1,18 @@ CREATE TABLE articletest ( - id integer not null, - title varchar(40) not null, - content varchar(8000) not null, - authorname varchar(40) not null + id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY, + title varchar(40) NOT NULL, + content text NOT NULL, + authorname varchar(40) NOT NULL ); -INSERT INTO articletest VALUES (1, 'My Life as a Goat', 'I went to Nepal to live as a goat, and it was much better than being a butler.', 'E. Blackadder'); -INSERT INTO articletest VALUES (2, 'Magnificent Octopus', 'Once upon a time there was a lovely little sausage.', 'S. Baldrick'); +INSERT INTO articletest (title, content, authorname) VALUES +( + 'My Life as a Goat', + 'I went to Nepal to live as a goat, and it was much better than being a butler.', + 'E. Blackadder' +), +( + 'Magnificent Octopus', + 'Once upon a time there was a lovely little sausage.', + 'S. Baldrick' +); diff --git a/examples/rust-outbound-pg/spin.toml b/examples/rust-outbound-pg/spin.toml index d84b3568ca..e8da1acd02 100644 --- a/examples/rust-outbound-pg/spin.toml +++ b/examples/rust-outbound-pg/spin.toml @@ -5,12 +5,10 @@ trigger = { type = "http", base = "/" } version = "0.1.0" [[component]] -environment = { DB_URL = "host=localhost user=postgres password=123 dbname=postgres" } +environment = { DB_URL = "host=localhost user=postgres dbname=spin_dev" } id = "outbound-pg" source = "target/wasm32-wasi/release/rust_outbound_pg.wasm" [component.trigger] -#route = "/write" -route = "/read" +route = "/..." [component.build] command = "cargo build --target wasm32-wasi --release" - diff --git a/examples/rust-outbound-pg/src/lib.rs b/examples/rust-outbound-pg/src/lib.rs index d84628af2c..70c0babe79 100644 --- a/examples/rust-outbound-pg/src/lib.rs +++ b/examples/rust-outbound-pg/src/lib.rs @@ -18,10 +18,20 @@ struct Article { } #[http_component] +fn process(req: Request) -> Result { + match req.uri().path() { + "/read" => read(req), + "/write" => write(req), + _ => Ok(http::Response::builder() + .status(404) + .body(Some("Not found".into()))?), + } +} + fn read(_req: Request) -> Result { let address = std::env::var(DB_URL_ENV)?; - let sql = "select id, title, content, authorname from articletest"; + let sql = "SELECT id, title, content, authorname FROM articletest"; let rowset = pg::query(&address, sql, &[]) .map_err(|e| anyhow!("Error executing Postgres query: {:?}", e))?; @@ -64,18 +74,27 @@ fn read(_req: Request) -> Result { .status(200) .body(Some(response.into()))?) } -/* + fn write(_req: Request) -> Result { let address = std::env::var(DB_URL_ENV)?; - let sql = "insert into articletest values ('aaa', 'bbb', 'ccc')"; - let nrow_executed = pg::execute(&address, sql, &vec![]).map_err(|_| anyhow!("Error execute pg command"))?; + let sql = "INSERT INTO articletest (title, content, authorname) VALUES ('aaa', 'bbb', 'ccc')"; + let nrow_executed = + pg::execute(&address, sql, &[]).map_err(|_| anyhow!("Error execute pg command"))?; println!("nrow_executed: {}", nrow_executed); - Ok(http::Response::builder().status(200).body(None)?) + let sql = "SELECT COUNT(id) FROM articletest"; + let rowset = pg::query(&address, sql, &[]) + .map_err(|e| anyhow!("Error executing Postgres query: {:?}", e))?; + let row = &rowset.rows[0]; + let count = as_bigint(&row[0])?; + let response = format!("Count: {}\n", count); + + Ok(http::Response::builder() + .status(200) + .body(Some(response.into()))?) } -*/ fn as_owned_string(value: &pg::DbValue) -> anyhow::Result { match value { @@ -94,6 +113,16 @@ fn as_int(value: &pg::DbValue) -> anyhow::Result { } } +fn as_bigint(value: &pg::DbValue) -> anyhow::Result { + match value { + pg::DbValue::Int64(n) => Ok(*n), + _ => Err(anyhow!( + "Expected integer from database but got {:?}", + value + )), + } +} + fn format_col(column: &pg::Column) -> String { format!("{}:{:?}", column.name, column.data_type) } From ebf24c524a7b5f10ca6754340eca5e1036007fa3 Mon Sep 17 00:00:00 2001 From: Konstantin Shabanov Date: Mon, 26 Sep 2022 10:55:05 +0600 Subject: [PATCH 2/2] feat(outbound-pg): Reuse connection during request lifecycle Also, pg_backend_pid endpoint into outbound-pg example. Before the fix: $ curl -i localhost:3000/pg_backend_pid HTTP/1.1 500 Internal Server Error content-length: 0 date: Tue, 27 Sep 2022 14:03:50 GMT thread '' panicked at 'assertion failed: `(left == right)` left: `592913`, right: `592914`', src/lib.rs:112:5 After the fix: $ curl -i localhost:3000/pg_backend_pid HTTP/1.1 200 OK content-length: 23 date: Tue, 27 Sep 2022 14:07:14 GMT pg_backend_pid: 595194 Ideally, this fix has to be covered by an integration test rather than manual testing through examples, but the testing environment has to be set up first. Refs: #667. Signed-off-by: Konstantin Shabanov --- crates/outbound-pg/src/lib.rs | 53 +++++++++++++++++----------- crates/trigger/src/lib.rs | 4 ++- examples/rust-outbound-pg/src/lib.rs | 22 ++++++++++++ 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/crates/outbound-pg/src/lib.rs b/crates/outbound-pg/src/lib.rs index 64446c8b0f..fc19eab911 100644 --- a/crates/outbound-pg/src/lib.rs +++ b/crates/outbound-pg/src/lib.rs @@ -1,9 +1,9 @@ use anyhow::anyhow; use outbound_pg::*; +use std::collections::HashMap; use tokio_postgres::{ - tls::NoTlsStream, types::{ToSql, Type}, - Connection, NoTls, Row, Socket, + Client, NoTls, Row, }; pub use outbound_pg::add_to_linker; @@ -16,8 +16,9 @@ use wit_bindgen_wasmtime::{async_trait, wasmtime::Linker}; wit_bindgen_wasmtime::export!({paths: ["../../wit/ephemeral/outbound-pg.wit"], async: *}); /// A simple implementation to support outbound pg connection -#[derive(Default, Clone)] -pub struct OutboundPg; +pub struct OutboundPg { + pub connections: HashMap, +} impl HostComponent for OutboundPg { type State = Self; @@ -33,7 +34,9 @@ impl HostComponent for OutboundPg { &self, _component: &spin_manifest::CoreComponent, ) -> anyhow::Result { - Ok(Self) + let connections = std::collections::HashMap::new(); + + Ok(Self { connections }) } } @@ -45,19 +48,16 @@ impl outbound_pg::OutboundPg for OutboundPg { statement: &str, params: Vec>, ) -> Result { - let (client, connection) = tokio_postgres::connect(address, NoTls) - .await - .map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?; - - spawn(connection); - let params: Vec<&(dyn ToSql + Sync)> = params .iter() .map(to_sql_parameter) .collect::>>() .map_err(|e| PgError::ValueConversionFailed(format!("{:?}", e)))?; - let nrow = client + let nrow = self + .get_client(address) + .await + .map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))? .execute(statement, params.as_slice()) .await .map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?; @@ -71,19 +71,16 @@ impl outbound_pg::OutboundPg for OutboundPg { statement: &str, params: Vec>, ) -> Result { - let (client, connection) = tokio_postgres::connect(address, NoTls) - .await - .map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?; - - spawn(connection); - let params: Vec<&(dyn ToSql + Sync)> = params .iter() .map(to_sql_parameter) .collect::>>() .map_err(|e| PgError::BadParameter(format!("{:?}", e)))?; - let results = client + let results = self + .get_client(address) + .await + .map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))? .query(statement, params.as_slice()) .await .map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?; @@ -246,10 +243,26 @@ fn convert_entry(row: &Row, index: usize) -> Result) { +impl OutboundPg { + async fn get_client(&mut self, address: &str) -> anyhow::Result<&Client> { + let client = match self.connections.entry(address.to_owned()) { + std::collections::hash_map::Entry::Occupied(o) => o.into_mut(), + std::collections::hash_map::Entry::Vacant(v) => v.insert(build_client(address).await?), + }; + Ok(client) + } +} + +async fn build_client(address: &str) -> anyhow::Result { + tracing::log::debug!("Build new connection: {}", address); + + let (client, connection) = tokio_postgres::connect(address, NoTls).await?; + tokio::spawn(async move { if let Err(e) = connection.await { tracing::warn!("Postgres connection error: {}", e); } }); + + Ok(client) } diff --git a/crates/trigger/src/lib.rs b/crates/trigger/src/lib.rs index e388c1160c..1cf6186e62 100644 --- a/crates/trigger/src/lib.rs +++ b/crates/trigger/src/lib.rs @@ -138,7 +138,9 @@ pub fn add_default_host_components( builder.add_host_component(outbound_redis::OutboundRedis { connections: Arc::new(RwLock::new(HashMap::new())), })?; - builder.add_host_component(outbound_pg::OutboundPg)?; + builder.add_host_component(outbound_pg::OutboundPg { + connections: HashMap::new(), + })?; Ok(()) } diff --git a/examples/rust-outbound-pg/src/lib.rs b/examples/rust-outbound-pg/src/lib.rs index 70c0babe79..835237e93d 100644 --- a/examples/rust-outbound-pg/src/lib.rs +++ b/examples/rust-outbound-pg/src/lib.rs @@ -22,6 +22,7 @@ fn process(req: Request) -> Result { match req.uri().path() { "/read" => read(req), "/write" => write(req), + "/pg_backend_pid" => pg_backend_pid(req), _ => Ok(http::Response::builder() .status(404) .body(Some("Not found".into()))?), @@ -96,6 +97,27 @@ fn write(_req: Request) -> Result { .body(Some(response.into()))?) } +fn pg_backend_pid(_req: Request) -> Result { + let address = std::env::var(DB_URL_ENV)?; + let sql = "SELECT pg_backend_pid()"; + + let get_pid = || { + let rowset = pg::query(&address, sql, &[]) + .map_err(|e| anyhow!("Error executing Postgres query: {:?}", e))?; + + let row = &rowset.rows[0]; + as_int(&row[0]) + }; + + assert_eq!(get_pid()?, get_pid()?); + + let response = format!("pg_backend_pid: {}\n", get_pid()?); + + Ok(http::Response::builder() + .status(200) + .body(Some(response.into()))?) +} + fn as_owned_string(value: &pg::DbValue) -> anyhow::Result { match value { pg::DbValue::Str(s) => Ok(s.to_owned()),