diff --git a/base_layer/core/tests/helpers/nodes.rs b/base_layer/core/tests/helpers/nodes.rs index 98702db9d8b..5302c519537 100644 --- a/base_layer/core/tests/helpers/nodes.rs +++ b/base_layer/core/tests/helpers/nodes.rs @@ -445,15 +445,16 @@ async fn setup_base_node_services( blockchain_db.clone().into(), base_node_service, )); - let comms = comms + let mut comms = comms .add_protocol_extension(rpc_server) .spawn_with_transport(MemoryTransport) .await .unwrap(); // Set the public address for tests + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); comms .node_identity() - .add_public_address(comms.listening_address().clone()); + .add_public_address(address.bind_address().clone()); let outbound_nci = handles.expect_handle::(); let local_nci = handles.expect_handle::(); diff --git a/base_layer/p2p/tests/support/comms_and_services.rs b/base_layer/p2p/tests/support/comms_and_services.rs index 4bd2dca73f8..168fa098423 100644 --- a/base_layer/p2p/tests/support/comms_and_services.rs +++ b/base_layer/p2p/tests/support/comms_and_services.rs @@ -51,11 +51,12 @@ pub async fn setup_comms_services( .await .unwrap(); - let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + let mut comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); // Set the public address for tests comms .node_identity() - .add_public_address(comms.listening_address().clone()); + .add_public_address(address.bind_address().clone()); (comms, dht, messaging_events) } diff --git a/base_layer/wallet/tests/support/comms_and_services.rs b/base_layer/wallet/tests/support/comms_and_services.rs index b6c7344f0e0..6d9f92d3fc9 100644 --- a/base_layer/wallet/tests/support/comms_and_services.rs +++ b/base_layer/wallet/tests/support/comms_and_services.rs @@ -58,11 +58,12 @@ pub async fn setup_comms_services( .await .unwrap(); - let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + let mut comms = comms.spawn_with_transport(MemoryTransport).await.unwrap(); + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); // Set the public address for tests comms .node_identity() - .add_public_address(comms.listening_address().clone()); + .add_public_address(address.bind_address().clone()); (comms, dht) } diff --git a/comms/core/examples/stress/service.rs b/comms/core/examples/stress/service.rs index 7880c07519e..2199638f4b0 100644 --- a/comms/core/examples/stress/service.rs +++ b/comms/core/examples/stress/service.rs @@ -63,10 +63,9 @@ pub fn start_service( let (request_tx, request_rx) = mpsc::channel(1); println!( - "Node credentials are {}::{:?} (local_listening_addr='{}')", + "Node credentials are {}::{:?})", node_identity.public_key().to_hex(), node_identity.public_addresses(), - comms_node.listening_address(), ); let service = StressTestService::new( diff --git a/comms/core/examples/stress_test.rs b/comms/core/examples/stress_test.rs index a101198b9ea..b39cc07d1ac 100644 --- a/comms/core/examples/stress_test.rs +++ b/comms/core/examples/stress_test.rs @@ -95,7 +95,7 @@ async fn run() -> Result<(), Error> { temp_dir.as_ref(), public_ip, port, - tor_identity, + tor_identity.clone(), is_tcp, shutdown.to_signal(), ) @@ -105,7 +105,7 @@ async fn run() -> Result<(), Error> { } if !is_tcp { if let Some(tor_identity_path) = tor_identity_path.as_ref() { - save_json(comms_node.hidden_service().unwrap().tor_identity(), tor_identity_path)?; + save_json(&tor_identity.unwrap(), tor_identity_path)?; } } diff --git a/comms/core/examples/tor.rs b/comms/core/examples/tor.rs index ac33ee50c78..cf3b6ef1d9f 100644 --- a/comms/core/examples/tor.rs +++ b/comms/core/examples/tor.rs @@ -87,16 +87,14 @@ async fn run() -> Result<(), Error> { println!("Comms nodes started!"); println!( - "Node 1 is '{}' with address '{:?}' (local_listening_addr='{}')", + "Node 1 is '{}' with address '{:?}')", node_identity1.node_id().short_str(), node_identity1.public_addresses(), - comms_node1.listening_address(), ); println!( - "Node 2 is '{}' with address '{:?}' (local_listening_addr='{}')", + "Node 2 is '{}' with address '{:?}')", node_identity2.node_id().short_str(), node_identity2.public_addresses(), - comms_node2.listening_address(), ); // Let's add node 2 as a peer to node 1 diff --git a/comms/core/src/builder/comms_node.rs b/comms/core/src/builder/comms_node.rs index 92824ee1b86..8885055e33c 100644 --- a/comms/core/src/builder/comms_node.rs +++ b/comms/core/src/builder/comms_node.rs @@ -296,6 +296,10 @@ impl CommsNode { self.connection_manager_requester.get_event_subscription() } + pub fn connection_manager_requester(&mut self)-> &mut ConnectionManagerRequester{ + &mut self.connection_manager_requester + } + /// Get a subscription to `ConnectivityEvent`s pub fn subscribe_connectivity_events(&self) -> ConnectivityEventRx { self.connectivity_requester.get_event_subscription() diff --git a/comms/core/src/builder/tests.rs b/comms/core/src/builder/tests.rs index a4d8a0ae9c6..3808031dbc3 100644 --- a/comms/core/src/builder/tests.rs +++ b/comms/core/src/builder/tests.rs @@ -88,7 +88,7 @@ async fn spawn_node( .unwrap(); let (messaging_events_sender, _) = broadcast::channel(100); - let comms_node = comms_node + let mut comms_node = comms_node .add_protocol_extensions(protocols.into()) .add_protocol_extension( MessagingProtocolExtension::new( @@ -107,8 +107,8 @@ async fn spawn_node( .spawn_with_transport(MemoryTransport) .await .unwrap(); - - unpack_enum!(Protocol::Memory(_port) = comms_node.listening_address().iter().next().unwrap()); + let address = comms_node.connection_manager_requester().wait_until_listening().await.unwrap(); + unpack_enum!(Protocol::Memory(_port) = address.bind_address().iter().next().unwrap()); (comms_node, inbound_rx, outbound_tx, messaging_events_sender) } diff --git a/comms/core/tests/tests/rpc.rs b/comms/core/tests/tests/rpc.rs index d97a0596d4e..24d08b11d86 100644 --- a/comms/core/tests/tests/rpc.rs +++ b/comms/core/tests/tests/rpc.rs @@ -44,15 +44,16 @@ async fn spawn_node(signal: ShutdownSignal) -> (CommsNode, RpcServerHandle) { .add_service(GreetingServer::new(GreetingService::default())); let rpc_server_hnd = rpc_server.get_handle(); - let comms = create_comms(signal) + let mut comms = create_comms(signal) .add_rpc_server(rpc_server) .spawn_with_transport(TcpTransport::new()) .await .unwrap(); + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); comms .node_identity() - .set_public_addresses(vec![comms.listening_address().clone()]); + .set_public_addresses(vec![address.bind_address().clone()]); (comms, rpc_server_hnd) } diff --git a/comms/core/tests/tests/rpc_stress.rs b/comms/core/tests/tests/rpc_stress.rs index 0e27fa38f90..42fe62cd660 100644 --- a/comms/core/tests/tests/rpc_stress.rs +++ b/comms/core/tests/tests/rpc_stress.rs @@ -46,15 +46,16 @@ async fn spawn_node(signal: ShutdownSignal) -> CommsNode { .finish() .add_service(GreetingServer::new(GreetingService::default())); - let comms = create_comms(signal) + let mut comms = create_comms(signal) .add_rpc_server(rpc_server) .spawn_with_transport(TcpTransport::new()) .await .unwrap(); + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); comms .node_identity() - .set_public_addresses(vec![comms.listening_address().clone()]); + .set_public_addresses(vec![address.bind_address().clone()]); comms } diff --git a/comms/core/tests/tests/substream_stress.rs b/comms/core/tests/tests/substream_stress.rs index d36a26d6735..cdeba323d62 100644 --- a/comms/core/tests/tests/substream_stress.rs +++ b/comms/core/tests/tests/substream_stress.rs @@ -41,15 +41,17 @@ const PROTOCOL_NAME: &[u8] = b"test/dummy/protocol"; pub async fn spawn_node(signal: ShutdownSignal) -> (CommsNode, ProtocolNotificationRx) { let (notif_tx, notif_rx) = mpsc::channel(1); - let comms = create_comms(signal) + let mut comms = create_comms(signal) .add_protocol(&[ProtocolId::from_static(PROTOCOL_NAME)], ¬if_tx) .spawn_with_transport(TcpTransport::new()) .await .unwrap(); + + let address = comms.connection_manager_requester().wait_until_listening().await.unwrap(); comms .node_identity() - .set_public_addresses(vec![comms.listening_address().clone()]); + .set_public_addresses(vec![address.bind_address().clone()]); (comms, notif_rx) }