Skip to content

Commit

Permalink
chore: add metrics for write embeeding to pg (#1084)
Browse files Browse the repository at this point in the history
  • Loading branch information
appflowy authored Dec 18, 2024
1 parent c0f7b1d commit 5f388c7
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 10 deletions.
36 changes: 30 additions & 6 deletions services/appflowy-collaborate/src/indexer/indexer_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use rayon::prelude::*;
use sqlx::PgPool;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::time::timeout;
use tracing::{debug, error, info, trace, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -101,8 +102,13 @@ impl IndexerScheduler {
this.index_enabled(),
num_thread
);

if this.index_enabled() {
tokio::spawn(spawn_write_indexing(rx, this.pg_pool.clone()));
tokio::spawn(spawn_write_indexing(
rx,
this.pg_pool.clone(),
this.metrics.clone(),
));
tokio::spawn(handle_unindexed_collabs(this.clone()));
}

Expand Down Expand Up @@ -301,7 +307,7 @@ impl IndexerScheduler {
indexer.embed(&embedder, chunks)
});
let duration = start.elapsed();
metrics.record_processing_time(duration.as_millis());
metrics.record_generate_embedding_time(duration.as_millis());

match result {
Ok(embed_result) => match embed_result {
Expand Down Expand Up @@ -494,7 +500,11 @@ async fn index_unindexd_collab(
}

const EMBEDDING_RECORD_BUFFER_SIZE: usize = 5;
async fn spawn_write_indexing(mut rx: UnboundedReceiver<EmbeddingRecord>, pg_pool: PgPool) {
async fn spawn_write_indexing(
mut rx: UnboundedReceiver<EmbeddingRecord>,
pg_pool: PgPool,
metrics: Arc<EmbeddingMetrics>,
) {
let mut buf = Vec::with_capacity(EMBEDDING_RECORD_BUFFER_SIZE);
loop {
let n = rx.recv_many(&mut buf, EMBEDDING_RECORD_BUFFER_SIZE).await;
Expand All @@ -503,14 +513,28 @@ async fn spawn_write_indexing(mut rx: UnboundedReceiver<EmbeddingRecord>, pg_poo
break;
}

let start = Instant::now();
let records = buf.drain(..n).collect::<Vec<_>>();
for record in records.iter() {
info!(
"[Embedding] generate collab:{} embeddings, tokens used: {}",
record.object_id, record.tokens_used
);
}
match batch_insert_records(&pg_pool, records).await {

let result = timeout(
Duration::from_secs(20),
batch_insert_records(&pg_pool, records),
)
.await
.unwrap_or_else(|_| {
Err(AppError::Internal(anyhow!(
"timeout when writing embeddings"
)))
});

metrics.record_write_embedding_time(start.elapsed().as_millis());
match result {
Ok(_) => trace!("[Embedding] save {} embeddings to disk", n),
Err(err) => error!("Failed to write collab embedding to disk:{}", err),
}
Expand Down Expand Up @@ -567,7 +591,7 @@ fn process_collab(
let chunks = indexer.create_embedded_chunks(&collab, embdder.model())?;
let result = indexer.embed(embdder, chunks);
let duration = start_time.elapsed();
metrics.record_processing_time(duration.as_millis());
metrics.record_generate_embedding_time(duration.as_millis());

match result {
Ok(Some(embeddings)) => Ok(Some((embeddings.tokens_consumed, embeddings.params))),
Expand Down
18 changes: 15 additions & 3 deletions services/appflowy-collaborate/src/indexer/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ pub struct EmbeddingMetrics {
total_embed_count: Counter,
failed_embed_count: Counter,
processing_time_histogram: Histogram,
write_embedding_time_histogram: Histogram,
}

impl EmbeddingMetrics {
fn init() -> Self {
Self {
total_embed_count: Counter::default(),
failed_embed_count: Counter::default(),
processing_time_histogram: Histogram::new([100.0, 300.0, 800.0, 2000.0, 5000.0].into_iter()),
processing_time_histogram: Histogram::new([500.0, 1000.0, 5000.0, 8000.0].into_iter()),
write_embedding_time_histogram: Histogram::new([500.0, 1000.0, 5000.0, 8000.0].into_iter()),
}
}

Expand All @@ -36,6 +38,11 @@ impl EmbeddingMetrics {
"Histogram of embedding processing times",
metrics.processing_time_histogram.clone(),
);
realtime_registry.register(
"write_embedding_time_seconds",
"Histogram of embedding write times",
metrics.write_embedding_time_histogram.clone(),
);

metrics
}
Expand All @@ -48,8 +55,13 @@ impl EmbeddingMetrics {
self.failed_embed_count.inc_by(count);
}

pub fn record_processing_time(&self, millis: u128) {
tracing::trace!("[Embedding]: processing time: {}ms", millis);
pub fn record_generate_embedding_time(&self, millis: u128) {
tracing::trace!("[Embedding]: generate embeddings cost: {}ms", millis);
self.processing_time_histogram.observe(millis as f64);
}

pub fn record_write_embedding_time(&self, millis: u128) {
tracing::trace!("[Embedding]: write embedding time cost: {}ms", millis);
self.write_embedding_time_histogram.observe(millis as f64);
}
}
2 changes: 1 addition & 1 deletion services/appflowy-worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Config {
// Adapted from: https://github.com/AppFlowy-IO/AppFlowy-Cloud/issues/984
smtp_username: get_env_var("APPFLOWY_MAILER_SMTP_USERNAME", "sender@example.com"),
smtp_password: get_env_var("APPFLOWY_MAILER_SMTP_PASSWORD", "password").into(),
smtp_tls_kind: get_env_var("APPFLOWY_MAILER_SMTP_TLS_KIND", "wrapper").into(),
smtp_tls_kind: get_env_var("APPFLOWY_MAILER_SMTP_TLS_KIND", "wrapper"),
},
})
}
Expand Down

0 comments on commit 5f388c7

Please sign in to comment.