diff --git a/libs/client-api/src/collab_sync/collab_sink.rs b/libs/client-api/src/collab_sync/collab_sink.rs index 171091bd2..82638e51a 100644 --- a/libs/client-api/src/collab_sync/collab_sink.rs +++ b/libs/client-api/src/collab_sync/collab_sink.rs @@ -274,6 +274,15 @@ where } async fn process_next_msg(&self) { + let is_empty_queue = self + .message_queue + .try_lock() + .map(|q| q.is_empty()) + .unwrap_or(true); + if is_empty_queue { + return; + } + let items = { let (mut msg_queue, mut sending_messages) = match ( self.message_queue.try_lock(), @@ -357,14 +366,15 @@ where // Try to merge the next message with the last message. Only merge when: // 1. The last message is not in the flying messages. - // 2. The last message can be merged. + // 2. The last message can be merged and the next message can be merged. // 3. The last message's payload size is less than the maximum payload size. if let Some(last) = items.last_mut() { - if !sending_messages.contains(&last.msg_id()) + let can_merge = !sending_messages.contains(&last.msg_id()) && last.message().payload_size() < self.config.maximum_payload_size && last.mergeable() - && last.merge(&next, &self.config.maximum_payload_size).is_ok() - { + && next.mergeable() + && last.merge(&next, &self.config.maximum_payload_size).is_ok(); + if can_merge { merged_ids .entry(last.msg_id()) .or_insert(vec![]) diff --git a/libs/client-api/src/collab_sync/plugin.rs b/libs/client-api/src/collab_sync/plugin.rs index 8e2a51edf..7b70e5117 100644 --- a/libs/client-api/src/collab_sync/plugin.rs +++ b/libs/client-api/src/collab_sync/plugin.rs @@ -192,7 +192,7 @@ where trace!("queue awareness: {:?}", update); } - ClientCollabMessage::new_update_sync(update_sync) + ClientCollabMessage::new_awareness_sync(update_sync) }); } diff --git a/libs/collab-rt-entity/src/client_message.rs b/libs/collab-rt-entity/src/client_message.rs index 1d884a3d1..996859893 100644 --- a/libs/collab-rt-entity/src/client_message.rs +++ b/libs/collab-rt-entity/src/client_message.rs @@ -13,6 +13,7 @@ use std::hash::{Hash, Hasher}; use yrs::merge_updates_v1; use yrs::updates::decoder::DecoderV1; use yrs::updates::encoder::{Encode, Encoder, EncoderV1}; + pub trait SinkMessage: Clone + Send + Sync + 'static + Ord + Display { fn payload_size(&self) -> usize; fn mergeable(&self) -> bool; @@ -20,8 +21,10 @@ pub trait SinkMessage: Clone + Send + Sync + 'static + Ord + Display { fn is_client_init_sync(&self) -> bool; fn is_server_init_sync(&self) -> bool; fn is_update_sync(&self) -> bool; + fn is_awareness_sync(&self) -> bool; fn is_ping_sync(&self) -> bool; } + #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ClientCollabMessage { ClientInitSync { data: InitSync }, @@ -44,6 +47,10 @@ impl ClientCollabMessage { Self::ServerInitSync(data) } + pub fn new_awareness_sync(data: UpdateSync) -> Self { + Self::ClientAwarenessSync(data) + } + pub fn size(&self) -> usize { match self { ClientCollabMessage::ClientInitSync { data, .. } => data.payload.len(), @@ -185,6 +192,10 @@ impl SinkMessage for ClientCollabMessage { matches!(self, ClientCollabMessage::ClientUpdateSync { .. }) } + fn is_awareness_sync(&self) -> bool { + matches!(self, ClientCollabMessage::ClientAwarenessSync { .. }) + } + fn is_ping_sync(&self) -> bool { matches!(self, ClientCollabMessage::ClientCollabStateCheck { .. }) }