@@ -5,30 +5,63 @@ use libp2p::gossipsub;
5
5
use libp2p:: identity;
6
6
use multiaddr:: Multiaddr ;
7
7
use tokio:: io:: AsyncBufReadExt ;
8
- use tracing:: info;
8
+ use tracing:: { error , info} ;
9
9
use tracing_subscriber:: prelude:: * ;
10
10
use tracing_subscriber:: EnvFilter ;
11
11
12
12
mod network;
13
13
14
14
#[ derive( Debug , Parser ) ]
15
- #[ clap( name = "DCUtR client example" ) ]
15
+ #[ clap( name = "Chat example" ) ]
16
16
struct Opt {
17
+ /// The mode (interactive, echo).
18
+ #[ clap( long) ]
19
+ mode : Mode ,
20
+
21
+ /// The port used to listen on all interfaces
22
+ #[ clap( long) ]
23
+ port : u16 ,
24
+
17
25
/// Fixed value to generate deterministic peer id.
18
26
#[ clap( long) ]
19
27
secret_key_seed : u8 ,
20
28
21
- /// The listening address
29
+ /// The listening address of a relay server to connect to.
22
30
#[ clap( long) ]
23
31
relay_address : Multiaddr ,
32
+
33
+ /// Optional list of peer addresses to dial immediately after network bootstrap.
34
+ #[ clap( long) ]
35
+ dial_peer_addrs : Option < Vec < Multiaddr > > ,
36
+
37
+ /// Optional list of gossip topic names to subscribe immediately after network bootstrap.
38
+ #[ clap( long) ]
39
+ gossip_topic_names : Option < Vec < String > > ,
40
+ }
41
+
42
+ #[ derive( Clone , Debug , PartialEq , Parser ) ]
43
+ enum Mode {
44
+ Interactive ,
45
+ Echo ,
46
+ }
47
+
48
+ impl FromStr for Mode {
49
+ type Err = String ;
50
+ fn from_str ( mode : & str ) -> Result < Self , Self :: Err > {
51
+ match mode {
52
+ "interactive" => Ok ( Mode :: Interactive ) ,
53
+ "echo" => Ok ( Mode :: Echo ) ,
54
+ _ => Err ( "Expected either 'dial' or 'listen'" . to_string ( ) ) ,
55
+ }
56
+ }
24
57
}
25
58
26
59
#[ tokio:: main]
27
60
async fn main ( ) -> eyre:: Result < ( ) > {
28
61
tracing_subscriber:: registry ( )
29
- // "info,chat_example::network =debug,{}",
62
+ // "info,chat_example=debug,{}",
30
63
. with ( EnvFilter :: builder ( ) . parse ( format ! (
31
- "info,chat_example::network=debug, {}" ,
64
+ "info,{}" ,
32
65
std:: env:: var( "RUST_LOG" ) . unwrap_or_default( )
33
66
) ) ?)
34
67
. with ( tracing_subscriber:: fmt:: layer ( ) )
@@ -39,24 +72,46 @@ async fn main() -> eyre::Result<()> {
39
72
let keypair = generate_ed25519 ( opt. secret_key_seed ) ;
40
73
41
74
let ( network_client, mut network_events) =
42
- network:: run ( keypair, opt. relay_address . clone ( ) ) . await ?;
75
+ network:: run ( keypair, opt. port , opt. relay_address . clone ( ) ) . await ?;
76
+
77
+ if let Some ( peer_addrs) = opt. dial_peer_addrs {
78
+ for addr in peer_addrs {
79
+ network_client. dial ( addr) . await ?;
80
+ }
81
+ }
82
+
83
+ if let Some ( topic_names) = opt. gossip_topic_names {
84
+ for topic_name in topic_names {
85
+ let topic = gossipsub:: IdentTopic :: new ( topic_name) ;
86
+ network_client. subscribe ( topic) . await ?;
87
+ }
88
+ }
43
89
44
- let mut stdin = tokio:: io:: BufReader :: new ( tokio:: io:: stdin ( ) ) . lines ( ) ;
90
+ match opt. mode {
91
+ Mode :: Interactive => {
92
+ let mut stdin = tokio:: io:: BufReader :: new ( tokio:: io:: stdin ( ) ) . lines ( ) ;
45
93
46
- loop {
47
- tokio:: select! {
48
- event = network_events. recv( ) => {
49
- let Some ( event) = event else {
50
- break ;
51
- } ;
52
- handle_event( network_client. clone( ) , event) . await ?;
53
- }
54
- line = stdin. next_line( ) => {
55
- if let Some ( line) = line? {
56
- handle_line( network_client. clone( ) , line) . await ?;
94
+ loop {
95
+ tokio:: select! {
96
+ event = network_events. recv( ) => {
97
+ let Some ( event) = event else {
98
+ break ;
99
+ } ;
100
+ handle_network_event( Mode :: Interactive , network_client. clone( ) , event) . await ?;
101
+ }
102
+ line = stdin. next_line( ) => {
103
+ if let Some ( line) = line? {
104
+ handle_line( network_client. clone( ) , line) . await ?;
105
+ }
106
+ }
57
107
}
58
108
}
59
109
}
110
+ Mode :: Echo => {
111
+ while let Some ( event) = network_events. recv ( ) . await {
112
+ handle_network_event ( Mode :: Echo , network_client. clone ( ) , event) . await ?;
113
+ }
114
+ }
60
115
}
61
116
62
117
Ok ( ( ) )
@@ -69,8 +124,9 @@ fn generate_ed25519(secret_key_seed: u8) -> identity::Keypair {
69
124
identity:: Keypair :: ed25519_from_bytes ( bytes) . expect ( "only errors on wrong length" )
70
125
}
71
126
72
- async fn handle_event (
73
- _: network:: client:: NetworkClient ,
127
+ async fn handle_network_event (
128
+ mode : Mode ,
129
+ network_client : network:: client:: NetworkClient ,
74
130
event : network:: types:: NetworkEvent ,
75
131
) -> eyre:: Result < ( ) > {
76
132
match event {
@@ -86,6 +142,28 @@ async fn handle_event(
86
142
peer_id, observed_addr
87
143
) ;
88
144
}
145
+ network:: types:: NetworkEvent :: Message { id, message } => {
146
+ let text = String :: from_utf8_lossy ( & message. data ) ;
147
+ info ! ( "Message from {:?}: {:?}" , id, text) ;
148
+
149
+ match mode {
150
+ Mode :: Echo => {
151
+ let text = format ! ( "Echo, original: '{}'" , text) ;
152
+
153
+ match network_client
154
+ . publish ( message. topic , text. into_bytes ( ) )
155
+ . await
156
+ {
157
+ Ok ( _) => info ! ( "Echoed message back" ) ,
158
+ Err ( err) => error ! ( %err, "Failed to echo message back" ) ,
159
+ } ;
160
+ }
161
+ _ => { }
162
+ }
163
+ }
164
+ network:: types:: NetworkEvent :: ListeningOn { address, .. } => {
165
+ info ! ( "Listening on: {}" , address) ;
166
+ }
89
167
event => {
90
168
info ! ( "Unhandled event: {:?}" , event) ;
91
169
}
0 commit comments