Skip to content

Commit

Permalink
Better control of retention queue size
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Jan 27, 2025
1 parent d8a9fe7 commit 0a9012f
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
4 changes: 4 additions & 0 deletions src/domain/follow_change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ impl FollowChange {

self
}

pub fn followed_at(&self) -> DateTime<Utc> {
self.followed_at
}
}

impl fmt::Display for FollowChange {
Expand Down
38 changes: 38 additions & 0 deletions src/domain/followee_notification_factory.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{FollowChange, NotificationMessage, MAX_FOLLOWERS_PER_BATCH};
use crate::rate_limiter::RateLimiter;
use chrono::Utc;
use nostr_sdk::prelude::*;
use ordermap::OrderMap;
use std::fmt::Debug;
Expand Down Expand Up @@ -144,6 +145,43 @@ impl FolloweeNotificationFactory {

vec![]
}

// Force flushes changes older than the specified duration, regardless of rate limiting
pub fn force_flush_old_changes(&mut self, max_age: Duration) -> Vec<NotificationMessage> {
if self.no_followers() {
return vec![];
}

let now = Utc::now();
let max_age = chrono::Duration::from_std(max_age).unwrap_or(chrono::Duration::zero());

// Filter changes that are older than max_age
let (old_changes, remaining_changes): (Vec<_>, Vec<_>) = self
.follow_changes
.drain(..)
.map(|(_, v)| v)
.filter(|v| v.is_follower())
.partition(|v| {
let age = now.signed_duration_since(v.followed_at());
age > max_age
});

// Put back the changes that are not old enough
for change in remaining_changes {
self.follow_changes.insert(*change.follower(), change);
}

if old_changes.is_empty() {
return vec![];
}

self.emptied_at = Some(Instant::now());

old_changes
.chunks(MAX_FOLLOWERS_PER_BATCH)
.map(|batch| batch.to_vec().into())
.collect()
}
}

impl Debug for FolloweeNotificationFactory {
Expand Down
22 changes: 19 additions & 3 deletions src/domain/notification_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use nostr_sdk::PublicKey;
use ordermap::OrderMap;
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::time::Duration;
use tracing::info;

type Followee = PublicKey;
Expand Down Expand Up @@ -53,14 +54,29 @@ impl NotificationFactory {
let initial_follow_changes_len = self.follow_changes_len();
let initial_followees_len = self.followees_len();

let messages = self
// First try normal flush
let mut messages = self
.followee_maps
.iter_mut()
.flat_map(|(_, followee_factory)| followee_factory.flush())
.collect::<Vec<_>>();

self.followee_maps
.retain(|_, followee_factory| !followee_factory.should_delete());
// Force flush changes older than 2 days
let force_flushed = self
.followee_maps
.iter_mut()
.flat_map(|(_, followee_factory)| {
followee_factory.force_flush_old_changes(Duration::from_secs(2 * 24 * 3600))
})
.collect::<Vec<_>>();

messages.extend(force_flushed);

// More aggressive cleanup
self.followee_maps.retain(|_, followee_factory| {
!followee_factory.should_delete() && followee_factory.follow_changes.len() < 1000
// Add size limit per followee
});

let follow_changes_len = self.follow_changes_len();
record_metrics(&messages, follow_changes_len);
Expand Down

0 comments on commit 0a9012f

Please sign in to comment.