Skip to content

Commit

Permalink
Merge pull request #791 from etehtsea/enh-pg-out-example
Browse files Browse the repository at this point in the history
feat(outbound-pg): Reuse connection during request lifecycle
  • Loading branch information
itowlson authored Sep 28, 2022
2 parents 8a2a6d3 + ebf24c5 commit e4245de
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 37 deletions.
53 changes: 33 additions & 20 deletions crates/outbound-pg/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use anyhow::anyhow;
use outbound_pg::*;
use std::collections::HashMap;
use tokio_postgres::{
tls::NoTlsStream,
types::{ToSql, Type},
Connection, NoTls, Row, Socket,
Client, NoTls, Row,
};

pub use outbound_pg::add_to_linker;
Expand All @@ -16,8 +16,9 @@ use wit_bindgen_wasmtime::{async_trait, wasmtime::Linker};
wit_bindgen_wasmtime::export!({paths: ["../../wit/ephemeral/outbound-pg.wit"], async: *});

/// A simple implementation to support outbound pg connection
#[derive(Default, Clone)]
pub struct OutboundPg;
pub struct OutboundPg {
pub connections: HashMap<String, Client>,
}

impl HostComponent for OutboundPg {
type State = Self;
Expand All @@ -33,7 +34,9 @@ impl HostComponent for OutboundPg {
&self,
_component: &spin_manifest::CoreComponent,
) -> anyhow::Result<Self::State> {
Ok(Self)
let connections = std::collections::HashMap::new();

Ok(Self { connections })
}
}

Expand All @@ -45,19 +48,16 @@ impl outbound_pg::OutboundPg for OutboundPg {
statement: &str,
params: Vec<ParameterValue<'_>>,
) -> Result<u64, PgError> {
let (client, connection) = tokio_postgres::connect(address, NoTls)
.await
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;

spawn(connection);

let params: Vec<&(dyn ToSql + Sync)> = params
.iter()
.map(to_sql_parameter)
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| PgError::ValueConversionFailed(format!("{:?}", e)))?;

let nrow = client
let nrow = self
.get_client(address)
.await
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?
.execute(statement, params.as_slice())
.await
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
Expand All @@ -71,19 +71,16 @@ impl outbound_pg::OutboundPg for OutboundPg {
statement: &str,
params: Vec<ParameterValue<'_>>,
) -> Result<RowSet, PgError> {
let (client, connection) = tokio_postgres::connect(address, NoTls)
.await
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;

spawn(connection);

let params: Vec<&(dyn ToSql + Sync)> = params
.iter()
.map(to_sql_parameter)
.collect::<anyhow::Result<Vec<_>>>()
.map_err(|e| PgError::BadParameter(format!("{:?}", e)))?;

let results = client
let results = self
.get_client(address)
.await
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?
.query(statement, params.as_slice())
.await
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
Expand Down Expand Up @@ -246,10 +243,26 @@ fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Err
Ok(value)
}

fn spawn(connection: Connection<Socket, NoTlsStream>) {
impl OutboundPg {
async fn get_client(&mut self, address: &str) -> anyhow::Result<&Client> {
let client = match self.connections.entry(address.to_owned()) {
std::collections::hash_map::Entry::Occupied(o) => o.into_mut(),
std::collections::hash_map::Entry::Vacant(v) => v.insert(build_client(address).await?),
};
Ok(client)
}
}

async fn build_client(address: &str) -> anyhow::Result<Client> {
tracing::log::debug!("Build new connection: {}", address);

let (client, connection) = tokio_postgres::connect(address, NoTls).await?;

tokio::spawn(async move {
if let Err(e) = connection.await {
tracing::warn!("Postgres connection error: {}", e);
}
});

Ok(client)
}
4 changes: 3 additions & 1 deletion crates/trigger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ pub fn add_default_host_components<T: Default + Send + 'static>(
builder.add_host_component(outbound_redis::OutboundRedis {
connections: Arc::new(RwLock::new(HashMap::new())),
})?;
builder.add_host_component(outbound_pg::OutboundPg)?;
builder.add_host_component(outbound_pg::OutboundPg {
connections: HashMap::new(),
})?;
Ok(())
}

Expand Down
49 changes: 49 additions & 0 deletions examples/rust-outbound-pg/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Spin Outbound PostgreSQL example

This example shows how to access a PostgreSQL database from Spin component.

## Spin up

From example root:

```
createdb spin_dev
psql -d spin_dev -f db/testdata.sql
RUST_LOG=spin=trace spin build --up
```

Curl the read route:

```
$ curl -i localhost:3000/read
HTTP/1.1 200 OK
content-length: 501
date: Sun, 25 Sep 2022 15:45:02 GMT
Found 2 article(s) as follows:
article: Article {
id: 1,
title: "My Life as a Goat",
content: "I went to Nepal to live as a goat, and it was much better than being a butler.",
authorname: "E. Blackadder",
}
article: Article {
id: 2,
title: "Magnificent Octopus",
content: "Once upon a time there was a lovely little sausage.",
authorname: "S. Baldrick",
}
(Column info: id:DbDataType::Int32, title:DbDataType::Str, content:DbDataType::Str, authorname:DbDataType::Str)
```

Curl the write route:

```
$ curl -i localhost:3000/write
HTTP/1.1 200 OK
content-length: 9
date: Sun, 25 Sep 2022 15:46:22 GMT
Count: 3
```
21 changes: 15 additions & 6 deletions examples/rust-outbound-pg/db/testdata.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
CREATE TABLE articletest (
id integer not null,
title varchar(40) not null,
content varchar(8000) not null,
authorname varchar(40) not null
id integer GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
title varchar(40) NOT NULL,
content text NOT NULL,
authorname varchar(40) NOT NULL
);

INSERT INTO articletest VALUES (1, 'My Life as a Goat', 'I went to Nepal to live as a goat, and it was much better than being a butler.', 'E. Blackadder');
INSERT INTO articletest VALUES (2, 'Magnificent Octopus', 'Once upon a time there was a lovely little sausage.', 'S. Baldrick');
INSERT INTO articletest (title, content, authorname) VALUES
(
'My Life as a Goat',
'I went to Nepal to live as a goat, and it was much better than being a butler.',
'E. Blackadder'
),
(
'Magnificent Octopus',
'Once upon a time there was a lovely little sausage.',
'S. Baldrick'
);
6 changes: 2 additions & 4 deletions examples/rust-outbound-pg/spin.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ trigger = { type = "http", base = "/" }
version = "0.1.0"

[[component]]
environment = { DB_URL = "host=localhost user=postgres password=123 dbname=postgres" }
environment = { DB_URL = "host=localhost user=postgres dbname=spin_dev" }
id = "outbound-pg"
source = "target/wasm32-wasi/release/rust_outbound_pg.wasm"
[component.trigger]
#route = "/write"
route = "/read"
route = "/..."
[component.build]
command = "cargo build --target wasm32-wasi --release"

63 changes: 57 additions & 6 deletions examples/rust-outbound-pg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,21 @@ struct Article {
}

#[http_component]
fn process(req: Request) -> Result<Response> {
match req.uri().path() {
"/read" => read(req),
"/write" => write(req),
"/pg_backend_pid" => pg_backend_pid(req),
_ => Ok(http::Response::builder()
.status(404)
.body(Some("Not found".into()))?),
}
}

fn read(_req: Request) -> Result<Response> {
let address = std::env::var(DB_URL_ENV)?;

let sql = "select id, title, content, authorname from articletest";
let sql = "SELECT id, title, content, authorname FROM articletest";
let rowset = pg::query(&address, sql, &[])
.map_err(|e| anyhow!("Error executing Postgres query: {:?}", e))?;

Expand Down Expand Up @@ -64,18 +75,48 @@ fn read(_req: Request) -> Result<Response> {
.status(200)
.body(Some(response.into()))?)
}
/*

fn write(_req: Request) -> Result<Response> {
let address = std::env::var(DB_URL_ENV)?;

let sql = "insert into articletest values ('aaa', 'bbb', 'ccc')";
let nrow_executed = pg::execute(&address, sql, &vec![]).map_err(|_| anyhow!("Error execute pg command"))?;
let sql = "INSERT INTO articletest (title, content, authorname) VALUES ('aaa', 'bbb', 'ccc')";
let nrow_executed =
pg::execute(&address, sql, &[]).map_err(|_| anyhow!("Error execute pg command"))?;

println!("nrow_executed: {}", nrow_executed);

Ok(http::Response::builder().status(200).body(None)?)
let sql = "SELECT COUNT(id) FROM articletest";
let rowset = pg::query(&address, sql, &[])
.map_err(|e| anyhow!("Error executing Postgres query: {:?}", e))?;
let row = &rowset.rows[0];
let count = as_bigint(&row[0])?;
let response = format!("Count: {}\n", count);

Ok(http::Response::builder()
.status(200)
.body(Some(response.into()))?)
}

fn pg_backend_pid(_req: Request) -> Result<Response> {
let address = std::env::var(DB_URL_ENV)?;
let sql = "SELECT pg_backend_pid()";

let get_pid = || {
let rowset = pg::query(&address, sql, &[])
.map_err(|e| anyhow!("Error executing Postgres query: {:?}", e))?;

let row = &rowset.rows[0];
as_int(&row[0])
};

assert_eq!(get_pid()?, get_pid()?);

let response = format!("pg_backend_pid: {}\n", get_pid()?);

Ok(http::Response::builder()
.status(200)
.body(Some(response.into()))?)
}
*/

fn as_owned_string(value: &pg::DbValue) -> anyhow::Result<String> {
match value {
Expand All @@ -94,6 +135,16 @@ fn as_int(value: &pg::DbValue) -> anyhow::Result<i32> {
}
}

fn as_bigint(value: &pg::DbValue) -> anyhow::Result<i64> {
match value {
pg::DbValue::Int64(n) => Ok(*n),
_ => Err(anyhow!(
"Expected integer from database but got {:?}",
value
)),
}
}

fn format_col(column: &pg::Column) -> String {
format!("{}:{:?}", column.name, column.data_type)
}

0 comments on commit e4245de

Please sign in to comment.