Skip to content

Commit

Permalink
Should notify connected after data channel opened
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Mar 19, 2024
1 parent 4169caf commit a1efaaa
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 357 deletions.
354 changes: 30 additions & 324 deletions crates/core/src/message/handlers/connection.rs

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion crates/core/src/message/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ impl MessageHandler {
self.handle_dht_events(&dht_ev).await
} else {
let dht_ev = self.dht.join(peer)?;
self.handle_dht_events(&dht_ev).await
self.handle_dht_events(&dht_ev).await.unwrap();
Ok(())
}
}

Expand Down
20 changes: 8 additions & 12 deletions crates/core/src/message/handlers/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ mod test {
use crate::dht::Stabilization;
use crate::ecc::tests::gen_ordered_keys;
use crate::ecc::SecretKey;
use crate::message::handlers::connection::tests::test_listen_join_and_init_find_succeesor;
use crate::message::handlers::connection::tests::test_only_two_nodes_establish_connection;
use crate::message::ConnectNodeReport;
use crate::message::ConnectNodeSend;
use crate::message::FindSuccessorReport;
Expand Down Expand Up @@ -140,16 +138,16 @@ mod test {
println!("|| now we connect node1 and node2 ||");
println!("========================================");

test_only_two_nodes_establish_connection(&node1, &node2).await?;
manually_establish_connection(&node1.swarm, &node2.swarm).await;
wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;

println!("========================================");
println!("|| now we start join node3 to node2 ||");
println!("========================================");

manually_establish_connection(&node3.swarm, &node2.swarm).await;
test_listen_join_and_init_find_succeesor(&node3, &node2).await?;
node3.listen_once().await.unwrap();
node2.listen_once().await.unwrap();
wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;

println!("=== Check state before stabilization ===");
Expand Down Expand Up @@ -375,18 +373,16 @@ mod test {
println!("|| now we connect node1 and node2 ||");
println!("========================================");

test_only_two_nodes_establish_connection(&node1, &node2).await?;
manually_establish_connection(&node1.swarm, &node2.swarm).await;
wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;

println!("========================================");
println!("|| now we start join node3 to node2 ||");
println!("========================================");

manually_establish_connection(&node3.swarm, &node2.swarm).await;
test_listen_join_and_init_find_succeesor(&node3, &node2).await?;
node1.listen_once().await.unwrap();
node2.listen_once().await.unwrap();
node2.listen_once().await.unwrap();
node3.listen_once().await.unwrap();
wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;

println!("=== Check state before stabilization ===");
Expand Down
26 changes: 20 additions & 6 deletions crates/core/src/message/handlers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,18 +278,26 @@ impl HandleMsg<SyncVNodeWithSuccessor> for MessageHandler {
mod test {
use super::*;
use crate::ecc::tests::gen_ordered_keys;
use crate::message::handlers::connection::tests::test_only_two_nodes_establish_connection;
use crate::message::Encoder;
use crate::prelude::vnode::VNodeType;
use crate::tests::default::assert_no_more_msg;
use crate::tests::default::prepare_node;
use crate::tests::default::wait_for_msgs;
use crate::tests::manually_establish_connection;

#[tokio::test]
async fn test_store_vnode() -> Result<()> {
let keys = gen_ordered_keys(2);
let (key1, key2) = (keys[0], keys[1]);
let keys = gen_ordered_keys(3);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
let node1 = prepare_node(key1).await;
let node2 = prepare_node(key2).await;
test_only_two_nodes_establish_connection(&node1, &node2).await?;

// This is only a dummy node for using assert_no_more_msg function
let node3 = prepare_node(key3).await;

manually_establish_connection(&node1.swarm, &node2.swarm).await;
wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;

// Now, node1 is the successor of node2, and node2 is the successor of node1.
// Following tests storing data on node2 and query it from node1.
Expand Down Expand Up @@ -359,10 +367,16 @@ mod test {
#[tokio::test]
async fn test_extend_data() -> Result<()> {
let keys = gen_ordered_keys(2);
let (key1, key2) = (keys[0], keys[1]);
let (key1, key2, key3) = (keys[0], keys[1], keys[2]);
let node1 = prepare_node(key1).await;
let node2 = prepare_node(key2).await;
test_only_two_nodes_establish_connection(&node1, &node2).await?;

// This is only a dummy node for using assert_no_more_msg function
let node3 = prepare_node(key3).await;

manually_establish_connection(&node1.swarm, &node2.swarm).await;
wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;

// Now, node1 is the successor of node2, and node2 is the successor of node1.
// Following tests storing data on node2 and query it from node1.
Expand Down
33 changes: 28 additions & 5 deletions crates/core/src/swarm/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,6 @@ impl TransportCallback for InnerSwarmCallback {
};

match s {
WebrtcConnectionState::Connected => {
self.message_handler.join_dht(did).await?;
}
WebrtcConnectionState::Failed
| WebrtcConnectionState::Disconnected
| WebrtcConnectionState::Closed => {
Expand All @@ -176,10 +173,36 @@ impl TransportCallback for InnerSwarmCallback {
_ => {}
};

// Should use the `on_data_channel_open` function to notify the Connected state.
// It prevents users from blocking the channel creation while
// waiting for data channel opening in send_message.
if s != WebrtcConnectionState::Connected {
self.callback
.on_event(&SwarmEvent::ConnectionStateChange {
peer: did,
state: s,
})
.await?
}

Ok(())
}

async fn on_data_channel_open(&self, cid: &str) -> Result<(), CallbackError> {
let Ok(did) = Did::from_str(cid) else {
tracing::warn!("on_data_channel_open parse did failed: {}", cid);
return Ok(());
};

self.message_handler.join_dht(did).await?;

// Notify Connected state here instead of on_peer_connection_state_change.
// It prevents users from blocking the channel creation while
// waiting for data channel opening in send_message.
self.callback
.on_event(&SwarmEvent::ConnectionStateChange {
peer: did,
state: s,
peer: self.transport.dht.did,
state: WebrtcConnectionState::Connected,
})
.await
}
Expand Down
12 changes: 9 additions & 3 deletions crates/core/src/tests/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,10 @@ pub async fn wait_for_msgs(node1: &Node, node2: &Node, node3: &Node) {
tokio::select! {
Some(payload) = node1.listen_once() => {
println!(
"Msg {} => node1 : {:?}",
"Msg {} -> node1 [{} => {}] : {:?}",
*did_names.get(&payload.signer()).unwrap(),
*did_names.get(&payload.transaction.signer()).unwrap(),
*did_names.get(&payload.transaction.destination).unwrap(),
payload.transaction.data::<Message>().unwrap()
)
}
Expand All @@ -153,8 +155,10 @@ pub async fn wait_for_msgs(node1: &Node, node2: &Node, node3: &Node) {
tokio::select! {
Some(payload) = node2.listen_once() => {
println!(
"Msg {} => node2 : {:?}",
"Msg {} -> node2 [{} => {}] : {:?}",
*did_names.get(&payload.signer()).unwrap(),
*did_names.get(&payload.transaction.signer()).unwrap(),
*did_names.get(&payload.transaction.destination).unwrap(),
payload.transaction.data::<Message>().unwrap()
)
}
Expand All @@ -168,8 +172,10 @@ pub async fn wait_for_msgs(node1: &Node, node2: &Node, node3: &Node) {
tokio::select! {
Some(payload) = node3.listen_once() => {
println!(
"Msg {} => node3 : {:?}",
"Msg {} -> node3 [{} => {}] : {:?}",
*did_names.get(&payload.signer()).unwrap(),
*did_names.get(&payload.transaction.signer()).unwrap(),
*did_names.get(&payload.transaction.destination).unwrap(),
payload.transaction.data::<Message>().unwrap()
)
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub struct SwarmTransport {
#[derive(Clone)]
pub struct SwarmConnection {
peer: Did,
connection: ConnectionRef<ConnectionOwner>,
pub connection: ConnectionRef<ConnectionOwner>,
}

impl SwarmTransport {
Expand Down Expand Up @@ -160,6 +160,7 @@ impl SwarmTransport {
};

if let Err(e) = conn.connection.webrtc_wait_for_data_channel_open().await {
dbg!(&e);
tracing::warn!(
"[get_and_check_connection] connection {peer} data channel not open, will be dropped, reason: {e:?}"
);
Expand Down
7 changes: 5 additions & 2 deletions crates/transport/src/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ impl InnerTransportCallback {
}

/// Notify the data channel is open.
pub fn on_data_channel_open(&self) {
self.data_channel_state_notifier.wake()
pub async fn on_data_channel_open(&self) {
self.data_channel_state_notifier.wake();
if let Err(e) = self.callback.on_data_channel_open(&self.cid).await {
tracing::error!("Callback on_data_channel_open failed: {e:?}");
}
}

/// Notify the data channel is close.
Expand Down
5 changes: 3 additions & 2 deletions crates/transport/src/connections/native_webrtc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ impl ConnectionInterface for WebrtcConnection {
.set_timeout(WEBRTC_WAIT_FOR_DATA_CHANNEL_OPEN_TIMEOUT);
self.webrtc_data_channel_state_notifier.clone().await;

dbg!(self.webrtc_data_channel.ready_state());

if self.webrtc_data_channel.ready_state() == RTCDataChannelState::Open {
return Ok(());
} else {
Expand Down Expand Up @@ -245,8 +247,7 @@ impl TransportInterface for WebrtcTransport {

let on_open_inner_cb = data_channel_inner_cb.clone();
d.on_open(Box::new(move || {
on_open_inner_cb.on_data_channel_open();
Box::pin(async move {})
Box::pin(async move { on_open_inner_cb.on_data_channel_open().await })
}));

let on_close_inner_cb = data_channel_inner_cb.clone();
Expand Down
5 changes: 4 additions & 1 deletion crates/transport/src/connections/web_sys_webrtc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ impl TransportInterface for WebSysWebrtcTransport {

let on_open_inner_cb = data_channel_inner_cb.clone();
let on_open = Box::new(move || {
on_open_inner_cb.on_data_channel_open();
let inner_cb = on_open_inner_cb.clone();
spawn_local(async move {
inner_cb.on_data_channel_open().await;
})
});

let on_close_inner_cb = data_channel_inner_cb.clone();
Expand Down
5 changes: 5 additions & 0 deletions crates/transport/src/core/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ type CallbackError = Box<dyn std::error::Error>;
#[cfg_attr(feature = "web-sys-webrtc", async_trait(?Send))]
#[cfg_attr(not(feature = "web-sys-webrtc"), async_trait)]
pub trait TransportCallback {
/// Notify the data channel is open.
async fn on_data_channel_open(&self, _cid: &str) -> Result<(), CallbackError> {
Ok(())
}

/// This method is invoked on a binary message arrival over the data channel of webrtc.
async fn on_message(&self, _cid: &str, _msg: &[u8]) -> Result<(), CallbackError> {
Ok(())
Expand Down

0 comments on commit a1efaaa

Please sign in to comment.