Skip to content

Commit bae7cb5

Browse files
committed
feat: wait for the relayed interfaces before proceeding with the main flow
1 parent 9fece9d commit bae7cb5

File tree

4 files changed

+87
-51
lines changed

4 files changed

+87
-51
lines changed

examples/chat/README.md

+6
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,18 @@ publish calimero-network/examples/chat/v0.0.1 ola
2222
## Run locally with remote peer dial in
2323
This examples shows how to run two sessions locally and connect sessions manually by dialing private remote peer from each session. For the gossip message to pass from one local session to second local session it needs to go "the long way" around (local -> remote -> local).
2424

25+
Additional info:
26+
- Remote instance is running in a private subnet behind NAT.
27+
- Remote instance PeerId: `12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp`
28+
- Remote instance address at the relay server: `ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp`
2529
Run first chat session in interactive mode with remote peer dial.
2630
```
31+
cargo run -p chat-example -- --mode interactive --port 4002 --secret-key-seed 102 --gossip-topic-names calimero-network/examples/chat/v0.0.1 --dial-peer-addrs /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp --relay-address /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF
2732
```
2833

2934
Run second chat session in interactive mode with remote peer dial.
3035
```
36+
cargo run -p chat-example -- --mode interactive --port 4003 --secret-key-seed 103 --gossip-topic-names calimero-network/examples/chat/v0.0.1 --dial-peer-addrs /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF/p2p-circuit/p2p/12D3KooWP285Hw3CSTdr9oU6Ezz4hDoi6XS5vfDjjNeTJ1uFMGvp --relay-address /ip4/3.71.239.80/udp/4001/quic-v1/p2p/12D3KooWAgFah4EZtWnMMGMUddGdJpb5cq2NubNCAD2jA5AZgbXF
3137
```
3238

3339
In any interactive session publish new message manually:

examples/chat/src/main.rs

+54-45
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ use std::str::FromStr;
33
use clap::Parser;
44
use libp2p::gossipsub;
55
use libp2p::identity;
6+
use libp2p::PeerId;
67
use multiaddr::Multiaddr;
78
use tokio::io::AsyncBufReadExt;
9+
use tracing::debug;
810
use tracing::{error, info};
911
use tracing_subscriber::prelude::*;
1012
use tracing_subscriber::EnvFilter;
@@ -51,7 +53,7 @@ impl FromStr for Mode {
5153
match mode {
5254
"interactive" => Ok(Mode::Interactive),
5355
"echo" => Ok(Mode::Echo),
54-
_ => Err("Expected either 'dial' or 'listen'".to_string()),
56+
_ => Err("Expected either 'interactive' or 'echo'".to_string()),
5557
}
5658
}
5759
}
@@ -72,21 +74,24 @@ async fn main() -> eyre::Result<()> {
7274
let keypair = generate_ed25519(opt.secret_key_seed);
7375

7476
let (network_client, mut network_events) =
75-
network::run(keypair, opt.port, opt.relay_address.clone()).await?;
77+
network::run(keypair.clone(), opt.port, opt.relay_address.clone()).await?;
7678

7779
if let Some(peer_addrs) = opt.dial_peer_addrs {
7880
for addr in peer_addrs {
81+
info!("Dialing peer: {}", addr);
7982
network_client.dial(addr).await?;
8083
}
8184
}
8285

8386
if let Some(topic_names) = opt.gossip_topic_names {
8487
for topic_name in topic_names {
88+
info!("Subscribing to topic: {}", topic_name);
8589
let topic = gossipsub::IdentTopic::new(topic_name);
8690
network_client.subscribe(topic).await?;
8791
}
8892
}
8993

94+
let peer_id = keypair.public().to_peer_id();
9095
match opt.mode {
9196
Mode::Interactive => {
9297
let mut stdin = tokio::io::BufReader::new(tokio::io::stdin()).lines();
@@ -97,7 +102,7 @@ async fn main() -> eyre::Result<()> {
97102
let Some(event) = event else {
98103
break;
99104
};
100-
handle_network_event(Mode::Interactive, network_client.clone(), event).await?;
105+
handle_network_event(network_client.clone(), event, peer_id, false).await?;
101106
}
102107
line = stdin.next_line() => {
103108
if let Some(line) = line? {
@@ -109,7 +114,7 @@ async fn main() -> eyre::Result<()> {
109114
}
110115
Mode::Echo => {
111116
while let Some(event) = network_events.recv().await {
112-
handle_network_event(Mode::Echo, network_client.clone(), event).await?;
117+
handle_network_event(network_client.clone(), event, peer_id, true).await?;
113118
}
114119
}
115120
}
@@ -124,43 +129,50 @@ fn generate_ed25519(secret_key_seed: u8) -> identity::Keypair {
124129
identity::Keypair::ed25519_from_bytes(bytes).expect("only errors on wrong length")
125130
}
126131

132+
const LINE_START: &str = ">>>>>>>>>> ";
133+
127134
async fn handle_network_event(
128-
mode: Mode,
129135
network_client: network::client::NetworkClient,
130136
event: network::types::NetworkEvent,
137+
peer_id: PeerId,
138+
is_echo: bool,
131139
) -> eyre::Result<()> {
132140
match event {
133141
network::types::NetworkEvent::IdentifySent { peer_id } => {
134-
info!("Identify sent to {:?}", peer_id);
142+
debug!("Identify sent to {:?}", peer_id);
135143
}
136144
network::types::NetworkEvent::IdentifyReceived {
137145
peer_id,
138146
observed_addr,
139147
} => {
140-
info!(
148+
debug!(
141149
"Identify received from {:?} at {:?}",
142150
peer_id, observed_addr
143151
);
144152
}
145-
network::types::NetworkEvent::Message { id, message } => {
153+
network::types::NetworkEvent::Message { message, .. } => {
146154
let text = String::from_utf8_lossy(&message.data);
147-
info!("Message from {:?}: {:?}", id, text);
148-
149-
match mode {
150-
Mode::Echo => {
151-
let text = format!("Echo, original: '{}'", text);
152-
153-
match network_client
154-
.publish(message.topic, text.into_bytes())
155-
.await
156-
{
157-
Ok(_) => info!("Echoed message back"),
158-
Err(err) => error!(%err, "Failed to echo message back"),
159-
};
155+
println!("{LINE_START} Received message: {:?}", text);
156+
157+
if is_echo {
158+
let text = format!("echo ({}): '{}'", peer_id, text);
159+
if text.starts_with("echo") {
160+
debug!("Ignoring echo message");
161+
return Ok(());
160162
}
161-
_ => {}
163+
164+
match network_client
165+
.publish(message.topic, text.into_bytes())
166+
.await
167+
{
168+
Ok(_) => debug!("Echoed message back"),
169+
Err(err) => error!(%err, "Failed to echo message back"),
170+
};
162171
}
163172
}
173+
network::types::NetworkEvent::Subscribed { topic, .. } => {
174+
debug!("Subscribed to {:?}", topic);
175+
}
164176
network::types::NetworkEvent::ListeningOn { address, .. } => {
165177
info!("Listening on: {}", address);
166178
}
@@ -185,128 +197,125 @@ async fn handle_line(
185197
let args = match args {
186198
Some(args) => args,
187199
None => {
188-
println!("Usage: dial <multiaddr>");
200+
println!("{LINE_START} Usage: dial <multiaddr>");
189201
return Ok(());
190202
}
191203
};
192204

193205
let addr = match Multiaddr::from_str(args) {
194206
Ok(addr) => addr,
195207
Err(err) => {
196-
println!("Failed to parse MultiAddr: {:?}", err);
208+
println!("{LINE_START} Failed to parse MultiAddr: {:?}", err);
197209
return Ok(());
198210
}
199211
};
200212

201-
info!("Dialing {:?}", addr);
213+
info!("{LINE_START} Dialing {:?}", addr);
202214

203215
match network_client.dial(addr).await {
204216
Ok(_) => {
205-
println!("Peer dialed");
217+
println!("{LINE_START} Peer dialed");
206218
}
207219
Err(err) => {
208-
println!("Failed to parse peer id: {:?}", err);
220+
println!("{LINE_START} Failed to dial peer: {:?}", err);
209221
}
210222
};
211223
}
212224
"subscribe" => {
213225
let args = match args {
214226
Some(args) => args,
215227
None => {
216-
println!("Usage: subscribe <topic-name>");
228+
println!("{LINE_START} Usage: subscribe <topic-name>");
217229
return Ok(());
218230
}
219231
};
220232

221233
let topic = gossipsub::IdentTopic::new(args.to_string());
222234
match network_client.subscribe(topic).await {
223235
Ok(_) => {
224-
println!("Peer dialed");
236+
println!("{LINE_START} Peer dialed");
225237
}
226238
Err(err) => {
227-
println!("Failed to parse peer id: {:?}", err);
239+
println!("{LINE_START} Failed to parse peer id: {:?}", err);
228240
}
229241
};
230242
}
231243
"unsubscribe" => {
232244
let args = match args {
233245
Some(args) => args,
234246
None => {
235-
println!("Usage: unsubscribe <topic-name>");
247+
println!("{LINE_START} Usage: unsubscribe <topic-name>");
236248
return Ok(());
237249
}
238250
};
239251

240252
let topic = gossipsub::IdentTopic::new(args.to_string());
241253
match network_client.unsubscribe(topic).await {
242254
Ok(_) => {
243-
println!("Peer dialed");
255+
println!("{LINE_START} Peer dialed");
244256
}
245257
Err(err) => {
246-
println!("Failed to parse peer id: {:?}", err);
258+
println!("{LINE_START} Failed to parse peer id: {:?}", err);
247259
}
248260
};
249261
}
250262
"publish" => {
251263
let args = match args {
252264
Some(args) => args,
253265
None => {
254-
println!("Usage: message <topic-name> <message>");
266+
println!("{LINE_START} Usage: message <topic-name> <message>");
255267
return Ok(());
256268
}
257269
};
258270

259-
// Extracting topic name and message data from args
260271
let mut args_iter = args.split_whitespace();
261272
let topic_name = match args_iter.next() {
262273
Some(topic) => topic,
263274
None => {
264-
println!("Usage: message <topic-name> <message>");
275+
println!("{LINE_START} Usage: message <topic-name> <message>");
265276
return Ok(());
266277
}
267278
};
268279

269280
let message_data = match args_iter.next() {
270281
Some(data) => data,
271282
None => {
272-
println!("Usage: message <topic-name> <message>");
283+
println!("{LINE_START} Usage: message <topic-name> <message>");
273284
return Ok(());
274285
}
275286
};
276287

277288
let topic = gossipsub::IdentTopic::new(topic_name.to_string());
278-
279-
// Publishing the message
280289
match network_client
281290
.publish(topic.hash(), message_data.as_bytes().to_vec())
282291
.await
283292
{
284293
Ok(_) => {
285-
println!("Message published successfully");
294+
println!("{LINE_START} Message published successfully");
286295
}
287296
Err(err) => {
288-
println!("Failed to publish message: {:?}", err);
297+
println!("{LINE_START} Failed to publish message: {:?}", err);
289298
}
290299
};
291300
}
292301
"peers" => {
293302
let peer_info = network_client.peer_info().await;
294-
info!("Peer info: {:?}", peer_info);
303+
println!("{LINE_START} Peer info: {:?}", peer_info);
295304
}
296305
"mesh-peers" => {
297306
let args = match args {
298307
Some(args) => args,
299308
None => {
300-
println!("Usage: mesh-peers <topic-name>");
309+
println!("{LINE_START} Usage: mesh-peers <topic-name>");
301310
return Ok(());
302311
}
303312
};
304313

305314
let topic = gossipsub::IdentTopic::new(args.to_string());
306315
let mesh_peer_info = network_client.mesh_peer_info(topic.hash()).await;
307-
info!("Mesh peer info: {:?}", mesh_peer_info);
316+
println!("{LINE_START} Mesh peer info: {:?}", mesh_peer_info);
308317
}
309-
_ => info!("Unknown command"),
318+
_ => println!("{LINE_START} Unknown command"),
310319
}
311320

312321
Ok(())

examples/chat/src/network.rs

+24-3
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub async fn run(
9393
observed_addr,
9494
} => {
9595
if peer_id == relay_peer_id {
96-
info!(address=%observed_addr, "Relay told us our observed address");
96+
info!("Relay told us our observed address: {}", observed_addr);
9797
learned_observed_addr = true;
9898
}
9999
}
@@ -105,21 +105,42 @@ pub async fn run(
105105
}
106106
}
107107

108-
// Create reservation on relay server and wait for it to be accepted.
108+
// Create reservation on relay server and wait for it to be accepted ...
109109
client
110110
.listen_on(relay_address.with(multiaddr::Protocol::P2pCircuit))
111111
.await?;
112112

113113
loop {
114114
match event_receiver.recv().await.unwrap() {
115115
types::NetworkEvent::RelayReservationAccepted => {
116-
info!("Relay reservation accepted");
116+
info!("Relay accepted our reservation");
117117
break;
118118
}
119119
event => info!("unexpected: {event:?}"),
120120
};
121121
}
122122

123+
// ... and now wait until we are listening on the "relayed" interfaces
124+
// Reference: https://github.com/libp2p/rust-libp2p/blob/60fd566a955a33c42a6ab6eefc1f0fedef9f8b83/examples/dcutr/src/main.rs#L118
125+
loop {
126+
tokio::select! {
127+
Some(event) = event_receiver.recv() => {
128+
match event {
129+
types::NetworkEvent::ListeningOn { address, .. } => {
130+
info!("Listening on: {}", address);
131+
}
132+
_ => {
133+
error!("Recieved unexpected network event: {:?}", event)
134+
}
135+
}
136+
}
137+
_ = tokio::time::sleep(Duration::from_secs(1)) => {
138+
// Likely listening on all interfaces now, thus continuing by breaking the loop.
139+
break;
140+
}
141+
}
142+
}
143+
123144
Ok((client, event_receiver))
124145
}
125146

examples/chat/src/network/events.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl EventLoop {
4444
SwarmEvent::ConnectionEstablished {
4545
peer_id, endpoint, ..
4646
} => {
47-
info!(peer=%peer_id, ?endpoint, "Connection established");
47+
debug!(peer=%peer_id, ?endpoint, "Connection established");
4848
match endpoint {
4949
libp2p::core::ConnectedPoint::Dialer { address, .. } => {
5050
let dial_preference = match DialPreference::try_from(&address) {
@@ -116,7 +116,7 @@ impl EventLoop {
116116
);
117117
}
118118
SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => {
119-
info!(%error, "Outgoing connection error");
119+
debug!(%error, ?peer_id, "Outgoing connection error");
120120
if let Some(peer_id) = peer_id {
121121
if let Some(entry) = self.pending_dial.remove(&peer_id) {
122122
let _ = entry.sender.send(Err(eyre::eyre!(error)));
@@ -129,7 +129,7 @@ impl EventLoop {
129129
SwarmEvent::Dialing {
130130
peer_id: Some(peer_id),
131131
..
132-
} => debug!("Dialing peer: {}", peer_id),
132+
} => trace!("Dialing peer: {}", peer_id),
133133
SwarmEvent::ExpiredListenAddr { address, .. } => {
134134
trace!("Expired listen address: {}", address)
135135
}

0 commit comments

Comments
 (0)