Skip to content

Commit

Permalink
optimizing further
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubDoka committed Dec 25, 2023
1 parent bacb93c commit 98b2eb1
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 31 deletions.
Binary file modified perf.data
Binary file not shown.
Binary file modified perf.data.old
Binary file not shown.
24 changes: 6 additions & 18 deletions swarm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use libp2p_core::upgrade;
use libp2p_core::upgrade::{NegotiationError, ProtocolError};
use libp2p_core::Endpoint;
use libp2p_identity::PeerId;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -154,9 +154,8 @@ where
>,

local_supported_protocols:
HashSet<AsStrHashEq<<THandler::InboundProtocol as UpgradeInfoSend>::Info>>,
HashMap<AsStrHashEq<<THandler::InboundProtocol as UpgradeInfoSend>::Info>, bool>,
remote_supported_protocols: HashSet<StreamProtocol>,
temp_protocols_set: HashSet<AsStrHashEq<<THandler::InboundProtocol as UpgradeInfoSend>::Info>>,
temp_protocols: Vec<StreamProtocol>,

idle_timeout: Duration,
Expand Down Expand Up @@ -194,12 +193,12 @@ where
.listen_protocol()
.upgrade()
.protocol_info()
.map(AsStrHashEq)
.collect::<HashSet<_>>();
.map(|i| (AsStrHashEq(i), true))
.collect::<HashMap<_, _>>();

if !local_supported_protocols.is_empty() {
let temp = local_supported_protocols
.iter()
.keys()
.filter_map(|i| StreamProtocol::try_from_owned(i.0.as_ref().to_owned()).ok())
.collect::<Vec<_>>();
handler.on_connection_event(ConnectionEvent::LocalProtocolsChange(
Expand All @@ -219,7 +218,6 @@ where
requested_substreams: Default::default(),
local_supported_protocols,
remote_supported_protocols: Default::default(),
temp_protocols_set: Default::default(),
temp_protocols: Default::default(),
idle_timeout,
stream_counter: ActiveStreamCounter::default(),
Expand Down Expand Up @@ -268,7 +266,6 @@ where
substream_upgrade_protocol_override,
local_supported_protocols: supported_protocols,
remote_supported_protocols,
temp_protocols_set,
temp_protocols,
idle_timeout,
stream_counter,
Expand Down Expand Up @@ -462,17 +459,9 @@ where
}
}

temp_protocols_set.clear();
temp_protocols_set.extend(
handler
.listen_protocol()
.upgrade()
.protocol_info()
.map(AsStrHashEq),
);
let changes = ProtocolsChange::from_full_sets(
supported_protocols,
temp_protocols_set,
handler.listen_protocol().upgrade().protocol_info(),
temp_protocols,
);
let mut has_changes = false;
Expand All @@ -481,7 +470,6 @@ where
has_changes = true;
}
if has_changes {
std::mem::swap(supported_protocols, temp_protocols_set);
continue;
}

Expand Down
32 changes: 19 additions & 13 deletions swarm/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub use select::ConnectionHandlerSelect;
use crate::StreamProtocol;
use core::slice;
use libp2p_core::Multiaddr;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::ops::Not;
use std::{error, fmt, io, task::Context, task::Poll, time::Duration};

Expand Down Expand Up @@ -333,22 +333,28 @@ pub enum ProtocolsChange<'a> {
impl<'a> ProtocolsChange<'a> {
/// Compute the [`ProtocolsChange`]s required to go from `existing_protocols` to `new_protocols`.
pub(crate) fn from_full_sets<T: AsRef<str>>(
existing_protocols: &HashSet<AsStrHashEq<T>>,
new_protocols: &HashSet<AsStrHashEq<T>>,
existing_protocols: &mut HashMap<AsStrHashEq<T>, bool>,
new_protocols: impl IntoIterator<Item = T>,
temp_owner: &'a mut Vec<StreamProtocol>,
) -> impl Iterator<Item = Self> {
temp_owner.clear();
temp_owner.extend(
new_protocols
.difference(existing_protocols)
.find_map(|p| StreamProtocol::try_from_owned(p.0.as_ref().to_owned()).ok()),
);
existing_protocols.values_mut().for_each(|v| *v = false);
for new_protocol in new_protocols {
*existing_protocols
.entry(AsStrHashEq(new_protocol))
.or_insert_with_key(|k| {
temp_owner.extend(StreamProtocol::try_from_owned(k.0.as_ref().to_owned()).ok());
false
}) = true;
}

let added_count = temp_owner.len();
temp_owner.extend(
existing_protocols
.difference(new_protocols)
.find_map(|p| StreamProtocol::try_from_owned(p.0.as_ref().to_owned()).ok()),
);
existing_protocols.retain(|k, v| {
if !*v {
temp_owner.push(StreamProtocol::try_from_owned(k.0.as_ref().to_owned()).unwrap());
}
*v
});

let (added, removed) = temp_owner.split_at(added_count);
added
Expand Down

0 comments on commit 98b2eb1

Please sign in to comment.