Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
SWvheerden committed Jan 25, 2024
1 parent 005e285 commit 1f6a79f
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 23 deletions.
5 changes: 3 additions & 2 deletions base_layer/core/tests/helpers/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,15 +445,16 @@ async fn setup_base_node_services(
blockchain_db.clone().into(),
base_node_service,
));
let comms = comms
let mut comms = comms
.add_protocol_extension(rpc_server)
.spawn_with_transport(MemoryTransport)
.await
.unwrap();
// Set the public address for tests
let address = comms.connection_manager_requester().wait_until_listening().await.unwrap();
comms
.node_identity()
.add_public_address(comms.listening_address().clone());
.add_public_address(address.bind_address().clone());

let outbound_nci = handles.expect_handle::<OutboundNodeCommsInterface>();
let local_nci = handles.expect_handle::<LocalNodeCommsInterface>();
Expand Down
5 changes: 3 additions & 2 deletions base_layer/p2p/tests/support/comms_and_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ pub async fn setup_comms_services(
.await
.unwrap();

let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap();
let mut comms = comms.spawn_with_transport(MemoryTransport).await.unwrap();
let address = comms.connection_manager_requester().wait_until_listening().await.unwrap();
// Set the public address for tests
comms
.node_identity()
.add_public_address(comms.listening_address().clone());
.add_public_address(address.bind_address().clone());

(comms, dht, messaging_events)
}
5 changes: 3 additions & 2 deletions base_layer/wallet/tests/support/comms_and_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ pub async fn setup_comms_services(
.await
.unwrap();

let comms = comms.spawn_with_transport(MemoryTransport).await.unwrap();
let mut comms = comms.spawn_with_transport(MemoryTransport).await.unwrap();
let address = comms.connection_manager_requester().wait_until_listening().await.unwrap();
// Set the public address for tests
comms
.node_identity()
.add_public_address(comms.listening_address().clone());
.add_public_address(address.bind_address().clone());

(comms, dht)
}
Expand Down
3 changes: 1 addition & 2 deletions comms/core/examples/stress/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,9 @@ pub fn start_service(
let (request_tx, request_rx) = mpsc::channel(1);

println!(
"Node credentials are {}::{:?} (local_listening_addr='{}')",
"Node credentials are {}::{:?})",
node_identity.public_key().to_hex(),
node_identity.public_addresses(),
comms_node.listening_address(),
);

let service = StressTestService::new(
Expand Down
4 changes: 2 additions & 2 deletions comms/core/examples/stress_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async fn run() -> Result<(), Error> {
temp_dir.as_ref(),
public_ip,
port,
tor_identity,
tor_identity.clone(),
is_tcp,
shutdown.to_signal(),
)
Expand All @@ -105,7 +105,7 @@ async fn run() -> Result<(), Error> {
}
if !is_tcp {
if let Some(tor_identity_path) = tor_identity_path.as_ref() {
save_json(comms_node.hidden_service().unwrap().tor_identity(), tor_identity_path)?;
save_json(&tor_identity.unwrap(), tor_identity_path)?;
}
}

Expand Down
6 changes: 2 additions & 4 deletions comms/core/examples/tor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,14 @@ async fn run() -> Result<(), Error> {

println!("Comms nodes started!");
println!(
"Node 1 is '{}' with address '{:?}' (local_listening_addr='{}')",
"Node 1 is '{}' with address '{:?}')",
node_identity1.node_id().short_str(),
node_identity1.public_addresses(),
comms_node1.listening_address(),
);
println!(
"Node 2 is '{}' with address '{:?}' (local_listening_addr='{}')",
"Node 2 is '{}' with address '{:?}')",
node_identity2.node_id().short_str(),
node_identity2.public_addresses(),
comms_node2.listening_address(),
);

// Let's add node 2 as a peer to node 1
Expand Down
4 changes: 4 additions & 0 deletions comms/core/src/builder/comms_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ impl CommsNode {
self.connection_manager_requester.get_event_subscription()
}

pub fn connection_manager_requester(&mut self)-> &mut ConnectionManagerRequester{
&mut self.connection_manager_requester
}

/// Get a subscription to `ConnectivityEvent`s
pub fn subscribe_connectivity_events(&self) -> ConnectivityEventRx {
self.connectivity_requester.get_event_subscription()
Expand Down
6 changes: 3 additions & 3 deletions comms/core/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn spawn_node(
.unwrap();

let (messaging_events_sender, _) = broadcast::channel(100);
let comms_node = comms_node
let mut comms_node = comms_node
.add_protocol_extensions(protocols.into())
.add_protocol_extension(
MessagingProtocolExtension::new(
Expand All @@ -107,8 +107,8 @@ async fn spawn_node(
.spawn_with_transport(MemoryTransport)
.await
.unwrap();

unpack_enum!(Protocol::Memory(_port) = comms_node.listening_address().iter().next().unwrap());
let address = comms_node.connection_manager_requester().wait_until_listening().await.unwrap();
unpack_enum!(Protocol::Memory(_port) = address.bind_address().iter().next().unwrap());

(comms_node, inbound_rx, outbound_tx, messaging_events_sender)
}
Expand Down
5 changes: 3 additions & 2 deletions comms/core/tests/tests/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ async fn spawn_node(signal: ShutdownSignal) -> (CommsNode, RpcServerHandle) {
.add_service(GreetingServer::new(GreetingService::default()));

let rpc_server_hnd = rpc_server.get_handle();
let comms = create_comms(signal)
let mut comms = create_comms(signal)
.add_rpc_server(rpc_server)
.spawn_with_transport(TcpTransport::new())
.await
.unwrap();

let address = comms.connection_manager_requester().wait_until_listening().await.unwrap();
comms
.node_identity()
.set_public_addresses(vec![comms.listening_address().clone()]);
.set_public_addresses(vec![address.bind_address().clone()]);

(comms, rpc_server_hnd)
}
Expand Down
5 changes: 3 additions & 2 deletions comms/core/tests/tests/rpc_stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ async fn spawn_node(signal: ShutdownSignal) -> CommsNode {
.finish()
.add_service(GreetingServer::new(GreetingService::default()));

let comms = create_comms(signal)
let mut comms = create_comms(signal)
.add_rpc_server(rpc_server)
.spawn_with_transport(TcpTransport::new())
.await
.unwrap();

let address = comms.connection_manager_requester().wait_until_listening().await.unwrap();
comms
.node_identity()
.set_public_addresses(vec![comms.listening_address().clone()]);
.set_public_addresses(vec![address.bind_address().clone()]);

comms
}
Expand Down
6 changes: 4 additions & 2 deletions comms/core/tests/tests/substream_stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ const PROTOCOL_NAME: &[u8] = b"test/dummy/protocol";

pub async fn spawn_node(signal: ShutdownSignal) -> (CommsNode, ProtocolNotificationRx<Substream>) {
let (notif_tx, notif_rx) = mpsc::channel(1);
let comms = create_comms(signal)
let mut comms = create_comms(signal)
.add_protocol(&[ProtocolId::from_static(PROTOCOL_NAME)], &notif_tx)
.spawn_with_transport(TcpTransport::new())
.await
.unwrap();


let address = comms.connection_manager_requester().wait_until_listening().await.unwrap();
comms
.node_identity()
.set_public_addresses(vec![comms.listening_address().clone()]);
.set_public_addresses(vec![address.bind_address().clone()]);

(comms, notif_rx)
}
Expand Down

0 comments on commit 1f6a79f

Please sign in to comment.