diff --git a/src/domain.rs b/src/domain.rs index 897fe6d..d42becd 100644 --- a/src/domain.rs +++ b/src/domain.rs @@ -11,4 +11,7 @@ pub mod follows_differ; pub use follows_differ::FollowsDiffer; pub mod follow_change_batch; -pub use follow_change_batch::{FollowChangeBatch, MAX_FOLLOWERS_PER_MESSAGE}; +pub use follow_change_batch::{FollowChangeBatch, MAX_FOLLOWERS_PER_BATCH}; + +pub mod followee_aggregator; +pub use followee_aggregator::FolloweeAggregator; diff --git a/src/domain/follow_change_aggregator.rs b/src/domain/follow_change_aggregator.rs index fa6c21e..690e486 100644 --- a/src/domain/follow_change_aggregator.rs +++ b/src/domain/follow_change_aggregator.rs @@ -1,74 +1,45 @@ -use super::{FollowChangeBatch, MAX_FOLLOWERS_PER_MESSAGE}; -use crate::domain::FollowChange; +use super::{FollowChangeBatch, MAX_FOLLOWERS_PER_BATCH}; +use crate::domain::{FollowChange, FolloweeAggregator}; use anyhow::Result; -use governor::clock::{DefaultClock, Reference}; -use governor::{ - clock::Clock, middleware::NoOpMiddleware, nanos::Nanos, state::keyed::DefaultKeyedStateStore, - Quota, RateLimiter, -}; +use governor::clock::Clock; +use governor::clock::DefaultClock; use metrics::{counter, gauge, histogram}; use nostr_sdk::PublicKey; use ordermap::OrderMap; -use std::num::NonZero; use std::time::Duration; use tracing::{debug, info}; -type Follower = PublicKey; type Followee = PublicKey; -type FollowChangeRateLimiter = RateLimiter< - PublicKey, - DefaultKeyedStateStore, - T, - NoOpMiddleware<::Instant>, ->; /// Aggregates `FollowChange` events by merging redundant follow/unfollow actions /// for the same follower-followee pair, managing message batching, and compiling /// the results into `FollowChangeBatch` instances per followee. pub struct FollowChangeAggregator { - followee_maps: OrderMap>, - rate_limiter: FollowChangeRateLimiter, + followee_maps: OrderMap>, max_retention: Duration, + max_messages_per_hour: u32, clock: T, } impl FollowChangeAggregator { - pub fn new(max_follows_per_hour: u32, max_retention_minutes: i64, clock: T) -> Result { - let quota = Quota::per_hour(NonZero::new(max_follows_per_hour).ok_or(anyhow::anyhow!( - "max_follows_per_hour must be greater than zero" - ))?); - - let rate_limiter = RateLimiter::dashmap_with_clock(quota, &clock); - + pub fn new(max_messages_per_hour: u32, max_retention_minutes: i64, clock: T) -> Result { Ok(Self { followee_maps: OrderMap::with_capacity(100_000), - rate_limiter, max_retention: Duration::from_secs(max_retention_minutes as u64 * 60), + max_messages_per_hour, clock, }) } pub fn insert(&mut self, follow_change: FollowChange) { - let follow_changes_map = self + let followee_info = self .followee_maps .entry(follow_change.followee) - .or_default(); - - if let Some((_, existing_change)) = follow_changes_map.get(&follow_change.follower) { - // If the new follow_change is older, do nothing - if follow_change.at < existing_change.at { - return; - } - - // If the new change is of a different type, remove the existing - // entry, they cancel each other - if follow_change.change_type != existing_change.change_type { - follow_changes_map.remove(&follow_change.follower); - return; - } - } + .or_insert_with_key(|_| { + FolloweeAggregator::new(self.max_messages_per_hour, self.clock.clone()) + }); - follow_changes_map.insert(follow_change.follower, (self.clock.now(), follow_change)); + followee_info.add_follower_change(follow_change) } /// Collects follow/unfollow changes per followee into FollowChangeBatch objects, @@ -81,31 +52,11 @@ impl FollowChangeAggregator { let initial_followees_len = self.followees_len(); let mut messages_map: OrderMap> = - OrderMap::with_capacity(self.followee_maps.len() / MAX_FOLLOWERS_PER_MESSAGE); - - self.followee_maps.retain(|_followee, follow_change_map| { - let mut rate_limited_followee = false; - - // TODO: extract_if would have been great here, keep an eye on nightly - let max_retention = &self.max_retention; - let rate_limiter = &mut self.rate_limiter; - follow_change_map.retain(|_follower, (inserted_at, follow_change)| { - let (should_retain, is_followee_rate_limited) = collect_follow_change( - max_retention, - rate_limiter, - inserted_at, - &mut messages_map, - rate_limited_followee, - follow_change, - &self.clock, - ); - - rate_limited_followee = is_followee_rate_limited; - - should_retain - }); + OrderMap::with_capacity(self.followee_maps.len() / MAX_FOLLOWERS_PER_BATCH); - !follow_change_map.is_empty() + self.followee_maps.retain(|_followee, followee_changes| { + followee_changes.drain_into_batches(&self.max_retention, &mut messages_map); + followee_changes.is_deletable() }); let messages = messages_map @@ -143,7 +94,10 @@ impl FollowChangeAggregator { } pub fn follow_changes_len(&self) -> usize { - self.followee_maps.values().map(|m| m.len()).sum() + self.followee_maps + .values() + .map(|m| m.follow_changes.len()) + .sum() } pub fn followees_len(&self) -> usize { @@ -151,65 +105,6 @@ impl FollowChangeAggregator { } } -/// Collects a follow change into a batch and returns whether the change should -/// be retained for later due to rate limits. -/// -/// - If the batches sent so far have been rate-limited, the change will be -/// retained for later processing but only within the max retention period. -/// -/// - Once the retention period is elapsed, the retained changes are sent in batches. -/// Batches with only one item will include friendly ID information, the -/// notification service will show them as "foobar@nos.social is a new -/// follower!" -/// Batches with multiple items will be shown as "You have 29 new followers and 29 unfollows!" -/// -/// - The batching process ensures that no batch contains more than -/// MAX_FOLLOWERS_PER_MESSAGE changes. If it didn't we'd hit the APNS max -/// payload limit. -fn collect_follow_change( - max_retention: &Duration, - rate_limiter: &mut FollowChangeRateLimiter, - inserted_at: &mut T::Instant, - messages_map: &mut OrderMap>, - is_followee_rate_limited: bool, - follow_change: &mut FollowChange, - clock: &T, -) -> (bool, bool) { - let followee = follow_change.followee; - //let retained_for_too_long = inserted_at.elapsed() > *max_retention; - let retained_for_too_long = - clock.now().duration_since(*inserted_at) > Nanos::new(max_retention.as_nanos() as u64); - let followee_batches = messages_map - .entry(followee) - .or_insert_with_key(|followee| vec![FollowChangeBatch::new(*followee)]); - - let latest_batch_for_followee = followee_batches - .last_mut() - .expect("Expected a non-empty batch for the followee"); - - let current_message_has_room = latest_batch_for_followee.len() < MAX_FOLLOWERS_PER_MESSAGE; - let rate_limited = is_followee_rate_limited - || (!retained_for_too_long && rate_limiter.check_key(&followee).is_err()); - - if !rate_limited || retained_for_too_long { - let batch = if latest_batch_for_followee.is_empty() - || (current_message_has_room && retained_for_too_long) - { - latest_batch_for_followee - } else { - followee_batches.push(FollowChangeBatch::new(followee)); - followee_batches - .last_mut() - .expect("New batch should be available") - }; - - batch.add(follow_change.clone()); - return (false, rate_limited); - } - - (true, rate_limited) -} - fn record_metrics(messages: &[FollowChangeBatch], retained_follow_changes: usize) { let mut individual_follow_changes = 0; let mut aggregated_follow_changes = 0; @@ -262,20 +157,20 @@ mod tests { #[test] fn test_insert_unique_follow_change() { - let mut unique_changes = + let mut aggregator = FollowChangeAggregator::new(10, 10, FakeRelativeClock::default()).unwrap(); let follower = Keys::generate().public_key(); let followee = Keys::generate().public_key(); let change1 = create_follow_change(follower, followee, seconds_to_datetime(1)); - unique_changes.insert(change1); + aggregator.insert(change1); let change2 = create_follow_change(follower, followee, seconds_to_datetime(1)); - unique_changes.insert(change2.clone()); + aggregator.insert(change2.clone()); // When they share the same time, the last change added should be kept - let messages = unique_changes.drain_into_batches(); + let messages = aggregator.drain_into_batches(); assert_eq!(messages.len(), 1); let message = &messages[0]; assert_message_eq(message, &followee, [follower], &[]); @@ -392,13 +287,16 @@ mod tests { #[test] fn test_no_message_after_rate_limit_is_hit_but_retention_not_elapsed() { // After one single follow change the rate limit will be hit - let max_follows_per_hour = 1; + let max_messages_per_hour = 1; let max_retention_minutes = 10; let clock = FakeRelativeClock::default(); - let mut aggregator = - FollowChangeAggregator::new(max_follows_per_hour, max_retention_minutes, clock.clone()) - .unwrap(); + let mut aggregator = FollowChangeAggregator::new( + max_messages_per_hour, + max_retention_minutes, + clock.clone(), + ) + .unwrap(); let follower1 = Keys::generate().public_key(); let follower2 = Keys::generate().public_key(); @@ -408,40 +306,47 @@ mod tests { let change1 = create_follow_change(follower1, followee, seconds_to_datetime(1)); aggregator.insert(change1.clone()); + // We hit the rate limit, but the retention time hasn't elapsed yet. + // The rate is one follow per hour, so we only get one message, the + // other one is retained. + let messages = aggregator.drain_into_batches(); + assert_batches_eq(&messages, &[(followee, &[change1])]); + let change2 = create_follow_change(follower2, followee, seconds_to_datetime(1)); aggregator.insert(change2.clone()); let change3 = create_follow_change(follower3, followee, seconds_to_datetime(1)); aggregator.insert(change3.clone()); - // We passed the rate limit, but the retention time hasn't elapsed yet. - // The rate is one follow per hour, so we only get one message, the - // other one is retained. - let messages = aggregator.drain_into_batches(); - assert_batches_eq(&messages, &[(followee, &[change1])]); - // We hit the limit so the rest of the messages are retained let messages = aggregator.drain_into_batches(); assert_batches_eq(&messages, &[]); + assert_eq!(aggregator.follow_changes_len(), 2); - // We pass the max retention time, the rest of the messages are packed - // together as they are less than MAX_FOLLOWERS_PER_MESSAGE - clock.advance(Duration::from_secs((max_retention_minutes as u64 + 1) * 60)); + // We pass the max retention time, but we still are under the rate limit so we get nothing + clock.advance(Duration::from_secs((max_retention_minutes as u64) * 60)); + let messages = aggregator.drain_into_batches(); + assert_batches_eq(&messages, &[]); + // We clear the rate limit + clock.advance(Duration::from_secs(50 * 60)); let messages = aggregator.drain_into_batches(); assert_batches_eq(&messages, &[(followee, &[change2, change3])]); } #[test] fn test_batch_sizes_after_rate_limit_and_retention_period() { - let max_follows_per_hour = 1; // After one single follow change, the rate limit will be hit + let max_messages_per_hour = 1; // After one single follow change, the rate limit will be hit let max_retention_minutes = 10; - const MAX_FOLLOWERS_TRIPLED: u64 = 3 * MAX_FOLLOWERS_PER_MESSAGE as u64; // The number of messages we will send for testing + const MAX_FOLLOWERS_TRIPLED: usize = 3 * MAX_FOLLOWERS_PER_BATCH as usize; // The number of messages we will send for testing let clock = FakeRelativeClock::default(); - let mut aggregator = - FollowChangeAggregator::new(max_follows_per_hour, max_retention_minutes, clock.clone()) - .unwrap(); + let mut aggregator = FollowChangeAggregator::new( + max_messages_per_hour, + max_retention_minutes, + clock.clone(), + ) + .unwrap(); let followee = Keys::generate().public_key(); @@ -450,18 +355,27 @@ mod tests { let follower = Keys::generate().public_key(); let change = create_follow_change(follower, followee, seconds_to_datetime(i)); aggregator.insert(change.clone()); + + clock.advance(Duration::from_secs(1)); changes.push(change); } // After inserting MAX_FOLLOWERS_TRIPLED changes, we hit the rate limit immediately after the first message. // The first message will be sent immediately, while the rest should be retained. let messages = aggregator.drain_into_batches(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0].len(), 1); + assert_eq!( + messages.len(), + 1, + "Expected a single message, got {:?}, changes: {:?}", + messages, + messages.iter().map(|m| m.len()).sum::() + ); + assert_eq!(messages[0].len(), MAX_FOLLOWERS_PER_BATCH); // All other messages are retained due to rate limiting let messages = aggregator.drain_into_batches(); assert_eq!(messages.len(), 0); + assert_eq!(aggregator.follow_changes_len(), 2 * MAX_FOLLOWERS_PER_BATCH,); // Just before the max_retention time elapses.. clock.advance(Duration::from_secs((max_retention_minutes as u64 - 1) * 60)); @@ -475,19 +389,22 @@ mod tests { ); aggregator.insert(change.clone()); + assert_eq!( + aggregator.follow_changes_len(), + 2 * MAX_FOLLOWERS_PER_BATCH + 1 + ); // After the max retention time elapses, all retained changes should be sent, in batches. clock.advance(Duration::from_secs(2 * 60)); let messages = aggregator.drain_into_batches(); - assert_eq!(messages.len(), 3); - // First couple should contain MAX_FOLLOWERS_PER_MESSAGE changes - assert_eq!(messages[0].len(), MAX_FOLLOWERS_PER_MESSAGE); - assert_eq!(messages[1].len(), MAX_FOLLOWERS_PER_MESSAGE); - assert_eq!( - messages[2].len() as u64, - MAX_FOLLOWERS_TRIPLED - 2u64 * MAX_FOLLOWERS_PER_MESSAGE as u64 - 1u64 - ); // Thirds batch should contain the remaining changes + assert_eq!(messages.len(), 2); + // First couple should contain MAX_FOLLOWERS_BATCH changes, they surpassed the maximum retention time, so they are sent regardless of being rate limited + assert_eq!(messages[0].len(), MAX_FOLLOWERS_PER_BATCH); + assert_eq!(messages[1].len(), MAX_FOLLOWERS_PER_BATCH); + + // The last change added is not sent, is too new and we already sent the maximum amount of messages for the period + assert_eq!(aggregator.follow_changes_len(), 1); // And another change arrives let follower = Keys::generate().public_key(); @@ -498,10 +415,21 @@ mod tests { ); aggregator.insert(change.clone()); + // And another one for a different followee + let followee2 = Keys::generate().public_key(); + let follower = Keys::generate().public_key(); + let change = create_follow_change( + follower, + followee2, + seconds_to_datetime(MAX_FOLLOWERS_TRIPLED), + ); + aggregator.insert(change.clone()); + let messages = aggregator.drain_into_batches(); - // Nothing is sent, the user just receive the messages so this one gets into next batch - assert_eq!(messages.len(), 0); + // Only the one for the new followee is sent as it's not rate limited, the rest already hit the limit. + assert_eq!(messages.len(), 1); + assert_eq!(aggregator.follow_changes_len(), 2); // The max retention time elapses again, all retained changes should be sent, in batches. clock.advance(Duration::from_secs((max_retention_minutes as u64 + 1) * 60)); @@ -511,6 +439,7 @@ mod tests { // This one has a single item assert_eq!(messages.len(), 1); assert_eq!(messages[0].len(), 2); + assert_eq!(aggregator.follow_changes_len(), 0); } #[test] @@ -574,8 +503,8 @@ mod tests { assert_bag_eq!(unfollows_vec, unfollows.as_ref()); } - fn seconds_to_datetime(seconds: u64) -> DateTime { - DateTime::::from(UNIX_EPOCH + Duration::from_secs(seconds)) + fn seconds_to_datetime(seconds: usize) -> DateTime { + DateTime::::from(UNIX_EPOCH + Duration::from_secs(seconds as u64)) } fn assert_batches_eq(actual: &[FollowChangeBatch], expected: &[(PublicKey, &[FollowChange])]) { diff --git a/src/domain/follow_change_batch.rs b/src/domain/follow_change_batch.rs index a9df87a..5080f62 100644 --- a/src/domain/follow_change_batch.rs +++ b/src/domain/follow_change_batch.rs @@ -9,7 +9,7 @@ use std::fmt::Debug; // This is the maximum total of followers and unfollowers we can have in a single message based on the APNS limit of 4096 bytes. // See tests done to discover this number in the notifications server: // https://github.com/planetary-social/nos-notification-service-go/blob/4728744c6125909375478ec5ddae5934f1d7e1f7/service/adapters/apns/apns_test.go#L162-L243 -pub const MAX_FOLLOWERS_PER_MESSAGE: usize = 58; +pub const MAX_FOLLOWERS_PER_BATCH: usize = 58; /// An serializable message containing follow changes for a single followee. #[derive(Clone, Serialize, Eq, PartialEq, Ord, PartialOrd)] diff --git a/src/domain/followee_aggregator.rs b/src/domain/followee_aggregator.rs new file mode 100644 index 0000000..d4e7fd3 --- /dev/null +++ b/src/domain/followee_aggregator.rs @@ -0,0 +1,148 @@ +use super::{FollowChangeBatch, MAX_FOLLOWERS_PER_BATCH}; +use crate::domain::FollowChange; +use crate::rate_counter::RateCounter; +use governor::{clock::Clock, clock::Reference, nanos::Nanos}; +use nostr_sdk::PublicKey; +use ordermap::OrderMap; +use std::time::Duration; + +type Follower = PublicKey; + +const ONE_HOUR: Duration = Duration::from_secs(60 * 60); + +pub struct FolloweeAggregator { + rate_counter: RateCounter, + pub follow_changes: OrderMap, + clock: T, +} + +impl FolloweeAggregator { + pub fn new(max_messages_per_hour: u32, clock: T) -> Self { + let rate_counter = RateCounter::new(max_messages_per_hour, ONE_HOUR, clock.clone()); + + Self { + rate_counter, + follow_changes: OrderMap::with_capacity(100), + clock, + } + } + + pub fn add_follower_change(&mut self, follow_change: FollowChange) { + let follower = follow_change.follower; + + if let Some((_, existing_change)) = self.follow_changes.get(&follower) { + // If the new follow_change is older, do nothing + if follow_change.at < existing_change.at { + return; + } + + // If the new change is of a different type, remove the existing + // entry, they cancel each other + if follow_change.change_type != existing_change.change_type { + self.follow_changes.remove(&follower); + return; + } + } + + self.follow_changes + .insert(follower, (self.clock.now(), follow_change)); + } + + pub fn is_deletable(&mut self) -> bool { + // We want to retain the followee info even if there are no + // changes so we remember the rate limit for one more period in + // case new changes arrive + !self.follow_changes.is_empty() || self.rate_counter.is_hit() + } + + pub fn drain_into_batches( + &mut self, + max_retention: &Duration, + messages_map: &mut OrderMap>, + ) { + // TODO: extract_if would have been great here, keep an eye on nightly + let rate_counter = &mut self.rate_counter; + let follow_change_map = &mut self.follow_changes; + + follow_change_map.retain(|_follower, (inserted_at, follow_change)| { + collect_follow_change( + max_retention, + inserted_at, + messages_map, + follow_change, + &self.clock, + rate_counter, + ) + }); + } +} + +/// Collects a follow change into a batch and returns whether the change should +/// be retained for later due to rate limits. +/// +/// - If the batches sent so far have been rate-limited, the change will be +/// retained for later processing but only within the max retention period. +/// +/// - Once the retention period is elapsed, the retained changes are sent in batches. +/// Batches with only one item will include friendly ID information, the +/// notification service will show them as "foobar@nos.social is a new +/// follower!" +/// Batches with multiple items will be shown as "You have 29 new followers and 29 unfollows!" +/// +/// - The batching process ensures that no batch contains more than +/// MAX_FOLLOWERS_PER_BATCH changes. If it didn't we'd hit the APNS max +/// payload limit. +fn collect_follow_change( + max_retention: &Duration, + inserted_at: &mut T::Instant, + messages_map: &mut OrderMap>, + follow_change: &mut FollowChange, + clock: &T, + rate_counter: &mut RateCounter, +) -> bool { + let followee = follow_change.followee; + //let retained_for_too_long = inserted_at.elapsed() > *max_retention; + let retained_for_too_long = + clock.now().duration_since(*inserted_at) > Nanos::new(max_retention.as_nanos() as u64); + let followee_batches = messages_map + .entry(followee) + .or_insert_with_key(|followee| vec![FollowChangeBatch::new(*followee)]); + + let latest_batch_for_followee = followee_batches + .last_mut() + .expect("Expected a non-empty batch for the followee"); + + let used_batch_has_room = !latest_batch_for_followee.is_empty() + && latest_batch_for_followee.len() < MAX_FOLLOWERS_PER_BATCH; + + let rate_limited = rate_counter.is_hit(); + if !rate_limited && !retained_for_too_long { + add_to_new_batch(followee, follow_change.clone(), followee_batches); + rate_counter.bump(); + return false; + } + + if used_batch_has_room { + latest_batch_for_followee.add(follow_change.clone()); + return false; + } + + if rate_limited && !retained_for_too_long { + return true; + } + + // If we reached this point it means that the batch is full or the retention time has elapsed + add_to_new_batch(followee, follow_change.clone(), followee_batches); + rate_counter.bump(); + false +} + +fn add_to_new_batch( + followee: PublicKey, + follow_change: FollowChange, + followee_batches: &mut Vec, +) { + let mut batch = FollowChangeBatch::new(followee); + batch.add(follow_change); + followee_batches.push(batch); +} diff --git a/src/main.rs b/src/main.rs index 1a7f2f7..581c154 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,7 @@ mod google_pubsub_client; mod http_server; mod migrations; mod publisher; +mod rate_counter; mod relay_subscriber; mod repo; mod worker_pool; diff --git a/src/rate_counter.rs b/src/rate_counter.rs new file mode 100644 index 0000000..5d294c2 --- /dev/null +++ b/src/rate_counter.rs @@ -0,0 +1,37 @@ +use governor::clock::Clock; +use governor::clock::Reference; +use std::time::Duration; + +pub struct RateCounter +where + T: Clock, +{ + items_sent: Vec, + limit: u32, + clock: T, + max_age: Duration, +} + +impl RateCounter { + pub fn new(limit: u32, max_age: Duration, clock: T) -> Self { + Self { + items_sent: Vec::new(), + limit, + clock, + max_age, + } + } + + pub fn bump(&mut self) { + let now = self.clock.now(); + self.items_sent.push(now); + + // Remove all items older than max_age + self.items_sent + .retain(|&instant| now.duration_since(instant) < self.max_age.into()); + } + + pub fn is_hit(&mut self) -> bool { + self.items_sent.len() as u32 >= self.limit + } +}