Skip to content

Commit

Permalink
Refactor for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Aug 23, 2024
1 parent 0fa5fca commit 62a2489
Showing 1 changed file with 125 additions and 86 deletions.
211 changes: 125 additions & 86 deletions src/follows_differ.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
domain::{follow::Follow, follow_change::FollowChange},
worker_pool::{WorkerTask, WorkerTaskItem},
};
use chrono::DateTime;
use chrono::{DateTime, FixedOffset};
use nostr_sdk::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -34,146 +34,185 @@ where
follow_change_sender,
}
}
}

impl<T> WorkerTask<Box<Event>> for FollowsDiffer<T>
where
T: RepoTrait + Sync + Send,
{
async fn call(&self, worker_task_item: WorkerTaskItem<Box<Event>>) -> Result<()> {
let WorkerTaskItem { item: event } = worker_task_item;
fn convert_timestamp(&self, timestamp: Timestamp) -> Result<DateTime<FixedOffset>> {
DateTime::from_timestamp(timestamp.as_u64() as i64, 0)
.map(|dt| dt.fixed_offset())
.ok_or_else(|| "Failed to convert timestamp to datetime".into())
}

let mut followed_counter = 0;
let mut unfollowed_counter = 0;
let mut unchanged = 0;
let follower = event.pubkey;
/// Initializes a structure that holds the differences between the stored
/// follows and the latest contact list.
async fn initialize_follows_diff(
&self,
follower: &PublicKey,
) -> Result<(HashMap<PublicKey, FollowsDiff>, Option<Timestamp>)> {
let stored_follows = self.repo.get_follows(follower).await?;
let mut follows_diff: HashMap<PublicKey, FollowsDiff> = HashMap::new();

let date_time = DateTime::from_timestamp(event.created_at.as_u64() as i64, 0)
.ok_or("Failed to convert timestamp to datetime")?;
let event_created_at = date_time.fixed_offset();

// Populate stored follows
let stored_follows = self.repo.get_follows(&follower).await?;

let mut maybe_latest_stored_updated_at: Option<Timestamp> = None;

for stored_follow in stored_follows {
let updated_at = Timestamp::from(stored_follow.updated_at.timestamp() as u64);
if let Some(ref mut latest_stored_updated_at) = maybe_latest_stored_updated_at {
if (stored_follow.updated_at.timestamp() as u64) > latest_stored_updated_at.as_u64()
{
*latest_stored_updated_at =
Timestamp::from(stored_follow.updated_at.timestamp() as u64);
if updated_at > *latest_stored_updated_at {
*latest_stored_updated_at = updated_at;
}
} else {
maybe_latest_stored_updated_at =
Some(Timestamp::from(stored_follow.updated_at.timestamp() as u64));
};
maybe_latest_stored_updated_at = Some(updated_at);
}

follows_diff
.entry(stored_follow.followee)
.or_default()
.stored_follow = Some(stored_follow.clone());
}

let first_seen = follows_diff.is_empty();
Ok((follows_diff, maybe_latest_stored_updated_at))
}

// Populate new follows
for tag in event.tags.iter() {
fn populate_new_follows(
&self,
follows_diff: &mut HashMap<PublicKey, FollowsDiff>,
event: &Event,
) {
for tag in &event.tags {
if let Some(TagStandard::PublicKey { public_key, .. }) = tag.as_standardized() {
follows_diff
.entry(*public_key)
.or_default()
.exists_in_latest_contact_list = true;
}
}
}

if let Some(latest_stored_updated_at) = maybe_latest_stored_updated_at {
if event.created_at <= latest_stored_updated_at {
debug!(
"Skipping follow list for {} as it's older than the last update",
follower
);
return Ok(());
}
}
async fn process_follows_diff(
&self,
follows_diff: HashMap<PublicKey, FollowsDiff>,
follower: &PublicKey,
event_created_at: DateTime<FixedOffset>,
) -> Result<(usize, usize, usize)> {
let mut followed_counter = 0;
let mut unfollowed_counter = 0;
let mut unchanged = 0;

// Process follows_diff
for (
followee,
FollowsDiff {
stored_follow: maybe_stored_follow,
exists_in_latest_contact_list,
},
) in follows_diff.into_iter()
{
match maybe_stored_follow {
// We have a DB entry for this followee
for (followee, diff) in follows_diff {
match diff.stored_follow {
Some(mut stored_follow) => {
if exists_in_latest_contact_list {
// Still following same followee, run an upsert that just updates the date
if diff.exists_in_latest_contact_list {
stored_follow.updated_at = event_created_at;

self.repo.upsert_follow(&stored_follow).await?;

unchanged += 1;
} else {
// Doesn't exist in the new follows list so we delete the follow
self.repo.delete_follow(&followee, &follower).await?;

let follow_change =
FollowChange::new_unfollowed(event.created_at, follower, followee);
self.follow_change_sender.send(follow_change)?;
self.send_follow_change(FollowChange::new_unfollowed(
Timestamp::from(event_created_at.timestamp() as u64),
*follower,
followee,
))?;
unfollowed_counter += 1;
}
}
None => {
if followee == follower {
if followee != *follower {
let follow = Follow {
followee,
follower: *follower,
updated_at: event_created_at,
created_at: event_created_at,
};
self.repo.upsert_follow(&follow).await?;
self.send_follow_change(FollowChange::new_followed(
Timestamp::from(event_created_at.timestamp() as u64),
*follower,
followee,
))?;
followed_counter += 1;
} else {
debug!("Skipping self-follow for {}", followee);
continue;
}

// There's no existing follow entry for this followee so this is a new follow
let follow = Follow {
followee,
follower,
updated_at: event_created_at,
created_at: event_created_at,
};

self.repo.upsert_follow(&follow).await?;

let follow_change =
FollowChange::new_followed(event.created_at, follower, followee);

self.follow_change_sender.send(follow_change)?;
followed_counter += 1;
}
}
}

// If we have a new follow list, we log it if it has any follows to avoid noise
Ok((followed_counter, unfollowed_counter, unchanged))
}

fn send_follow_change(&self, follow_change: FollowChange) -> Result<()> {
self.follow_change_sender.send(follow_change)?;
Ok(())
}

fn log_results(
&self,
follower: PublicKey,
event_created_at: Timestamp,
followed_counter: usize,
unfollowed_counter: usize,
unchanged: usize,
first_seen: bool,
) {
if first_seen && followed_counter > 0 {
info!(
"Pubkey {}: date {}, {} followed, new follows list",
follower,
event.created_at.to_human_datetime(),
event_created_at.to_human_datetime(),
followed_counter,
);

return Ok(());
}

// If nothing changed, we don't log anything, it's just noise from older events
if followed_counter > 0 || unfollowed_counter > 0 {
} else if followed_counter > 0 || unfollowed_counter > 0 {
info!(
"Pubkey {}: date {}, {} followed, {} unfollowed, {} unchanged",
follower,
event.created_at.to_human_datetime(),
event_created_at.to_human_datetime(),
followed_counter,
unfollowed_counter,
unchanged
);
}
}
}

impl<T> WorkerTask<Box<Event>> for FollowsDiffer<T>
where
T: RepoTrait + Sync + Send,
{
async fn call(&self, worker_task_item: WorkerTaskItem<Box<Event>>) -> Result<()> {
let WorkerTaskItem { item: event } = worker_task_item;
let follower = event.pubkey;

let event_created_at = self.convert_timestamp(event.created_at)?;

// Get the stored follows and the latest update time from the database
let (mut follows_diff, maybe_latest_stored_updated_at) =
self.initialize_follows_diff(&follower).await?;

// Populate the new follows from the event tags
self.populate_new_follows(&mut follows_diff, &event);

// Check if the event is older than the latest stored update and skip if so
if let Some(latest_stored_updated_at) = maybe_latest_stored_updated_at {
if event.created_at <= latest_stored_updated_at {
debug!(
"Skipping follow list for {} as it's older than the last update",
follower
);
return Ok(());
}
}

let first_seen = follows_diff.is_empty();
// Process the follows_diff and apply changes
let (followed_counter, unfollowed_counter, unchanged) = self
.process_follows_diff(follows_diff, &follower, event_created_at)
.await?;

self.log_results(
follower,
event.created_at,
followed_counter,
unfollowed_counter,
unchanged,
first_seen,
);

Ok(())
}
Expand Down

0 comments on commit 62a2489

Please sign in to comment.