Skip to content

Commit

Permalink
Merge pull request #1170 from AppFlowy-IO/fix_sync_issues
Browse files Browse the repository at this point in the history
fix: do not merge awareness sync data info client update sync data
  • Loading branch information
khorshuheng authored Jan 20, 2025
2 parents 524bdef + 0047609 commit 227066b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 5 deletions.
18 changes: 14 additions & 4 deletions libs/client-api/src/collab_sync/collab_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ where
}

async fn process_next_msg(&self) {
let is_empty_queue = self
.message_queue
.try_lock()
.map(|q| q.is_empty())
.unwrap_or(true);
if is_empty_queue {
return;
}

let items = {
let (mut msg_queue, mut sending_messages) = match (
self.message_queue.try_lock(),
Expand Down Expand Up @@ -357,14 +366,15 @@ where

// Try to merge the next message with the last message. Only merge when:
// 1. The last message is not in the flying messages.
// 2. The last message can be merged.
// 2. The last message can be merged and the next message can be merged.
// 3. The last message's payload size is less than the maximum payload size.
if let Some(last) = items.last_mut() {
if !sending_messages.contains(&last.msg_id())
let can_merge = !sending_messages.contains(&last.msg_id())
&& last.message().payload_size() < self.config.maximum_payload_size
&& last.mergeable()
&& last.merge(&next, &self.config.maximum_payload_size).is_ok()
{
&& next.mergeable()
&& last.merge(&next, &self.config.maximum_payload_size).is_ok();
if can_merge {
merged_ids
.entry(last.msg_id())
.or_insert(vec![])
Expand Down
2 changes: 1 addition & 1 deletion libs/client-api/src/collab_sync/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ where
trace!("queue awareness: {:?}", update);
}

ClientCollabMessage::new_update_sync(update_sync)
ClientCollabMessage::new_awareness_sync(update_sync)
});
}

Expand Down
11 changes: 11 additions & 0 deletions libs/collab-rt-entity/src/client_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@ use std::hash::{Hash, Hasher};
use yrs::merge_updates_v1;
use yrs::updates::decoder::DecoderV1;
use yrs::updates::encoder::{Encode, Encoder, EncoderV1};

pub trait SinkMessage: Clone + Send + Sync + 'static + Ord + Display {
fn payload_size(&self) -> usize;
fn mergeable(&self) -> bool;
fn merge(&mut self, other: &Self, maximum_payload_size: &usize) -> Result<bool, Error>;
fn is_client_init_sync(&self) -> bool;
fn is_server_init_sync(&self) -> bool;
fn is_update_sync(&self) -> bool;
fn is_awareness_sync(&self) -> bool;
fn is_ping_sync(&self) -> bool;
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ClientCollabMessage {
ClientInitSync { data: InitSync },
Expand All @@ -44,6 +47,10 @@ impl ClientCollabMessage {
Self::ServerInitSync(data)
}

pub fn new_awareness_sync(data: UpdateSync) -> Self {
Self::ClientAwarenessSync(data)
}

pub fn size(&self) -> usize {
match self {
ClientCollabMessage::ClientInitSync { data, .. } => data.payload.len(),
Expand Down Expand Up @@ -185,6 +192,10 @@ impl SinkMessage for ClientCollabMessage {
matches!(self, ClientCollabMessage::ClientUpdateSync { .. })
}

fn is_awareness_sync(&self) -> bool {
matches!(self, ClientCollabMessage::ClientAwarenessSync { .. })
}

fn is_ping_sync(&self) -> bool {
matches!(self, ClientCollabMessage::ClientCollabStateCheck { .. })
}
Expand Down

0 comments on commit 227066b

Please sign in to comment.