From c4cca02aa5be320b92613dfea3ec0935f4510929 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 2 Oct 2024 13:08:27 +0530 Subject: [PATCH] feat: don't use SQL `min` --- .persistence/events.db | Bin 12288 -> 12288 bytes ...ddfa27d5617cec346fcd252e1db7259241ec.json} | 14 +- ...591590b8fb84685e397655bd6250b83aac2ea.json | 12 - ...2b32ae853c59533b8f82903c76f2e8fc377f9.json | 12 + Cargo.lock | 428 ++++++++---------- configs/simulator.toml | 8 +- uplink/src/collector/events.rs | 19 +- 7 files changed, 218 insertions(+), 275 deletions(-) rename .sqlx/{query-13369ca393c2d834e1eb6a88318937970562a7dd07a650966126e80038dca9cc.json => query-2b595edd80e01c38d5792760e3c9ddfa27d5617cec346fcd252e1db7259241ec.json} (57%) delete mode 100644 .sqlx/query-41ca2170d8f855463f7f8d95ad6591590b8fb84685e397655bd6250b83aac2ea.json create mode 100644 .sqlx/query-b452087781be1222c5f9a71a5b72b32ae853c59533b8f82903c76f2e8fc377f9.json diff --git a/.persistence/events.db b/.persistence/events.db index 7aabade5c4811228e7c688cc39d5d6858a189cff..5d11782c1ff387876c4cac0e5b1b244598511707 100644 GIT binary patch delta 98 zcmZojXh_(=A;83cdb6OwL4JE Result<(StreamName, RawPayload), Error> { - let row = sqlx::query!("SELECT stream, raw FROM payloads ORDER BY id ASC LIMIT 1") + /// Read out one row from the payloads table, without deleting + pub async fn peek(&mut self) -> Result<(i64, StreamName, RawPayload), Error> { + let row = sqlx::query!("SELECT id, stream, raw FROM payloads ORDER BY id ASC LIMIT 1") .fetch_one(&mut self.conn) .await?; + let id = row.id; let stream = row.stream; let raw = row.raw; - Ok((stream, raw)) + Ok((id, stream, raw)) } /// Forget messages acked by the broker - pub async fn pop(&mut self) -> Result<(), Error> { - sqlx::query!("DELETE FROM payloads WHERE id = (SELECT MIN(id) FROM payloads);") - .execute(&mut self.conn) - .await?; + pub async fn pop(&mut self, id: i64) -> Result<(), Error> { + sqlx::query!("DELETE FROM payloads WHERE id = ?1;", id).execute(&mut self.conn).await?; Ok(()) } @@ -125,7 +124,7 @@ async fn push_to_broker_on_ack( ) { 'outer: loop { let mut guard = queue.lock().await; - let (stream, text) = match guard.peek().await { + let (id, stream, text) = match guard.peek().await { Ok(q) => q, Err(Error::Sql(sqlx::Error::RowNotFound)) => { debug!("Looks like event queue is handled for the time being, check again in 5s"); @@ -152,7 +151,7 @@ async fn push_to_broker_on_ack( error!("{e}") } // Pop acknowledged packet - if let Err(e) = queue.lock().await.pop().await { + if let Err(e) = queue.lock().await.pop(id).await { error!("{e}"); } }