Skip to content

Commit

Permalink
Merge pull request #284 from freedomlayer/real/fix/connection-attempt…
Browse files Browse the repository at this point in the history
…-timeout

Add basic timeout to connection handshake
  • Loading branch information
realcr authored Apr 12, 2020
2 parents 793e1ca + e618977 commit f4c995a
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 76 deletions.
2 changes: 0 additions & 2 deletions components/bin/src/stindex/net_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ where
client_conns_sender,
incoming_client_transform,
max_concurrent_encrypt,
spawner.clone(),
)
.map_err(|e| error!("client incoming transform_pool_loop() error: {:?}", e))
.map(|_| ());
Expand All @@ -298,7 +297,6 @@ where
server_conns_sender,
incoming_server_transform,
max_concurrent_encrypt,
spawner.clone(),
)
.map_err(|e| error!("server incoming transform_pool_loop() error: {:?}", e))
.map(|_| ());
Expand Down
1 change: 0 additions & 1 deletion components/bin/src/stnode/net_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ where
incoming_apps_sender,
app_conn_transform,
max_concurrent_incoming_apps,
spawner.clone(),
)
.map_err(|e| error!("transform_pool_loop() error: {:?}", e))
.map(|_| ());
Expand Down
5 changes: 2 additions & 3 deletions components/bin/src/strelay/net_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use derive_more::*;
use common::conn::{BoxFuture, ConnPairVec, FutTransform};
use common::transform_pool::transform_pool_loop;

use proto::consts::{CONN_TIMEOUT_TICKS, KEEPALIVE_TICKS};
use proto::consts::{KEEPALIVE_TICKS, RELAY_CONN_TIMEOUT_TICKS};
use proto::crypto::PublicKey;

use crypto::rand::CryptoRandom;
Expand Down Expand Up @@ -103,7 +103,6 @@ where
enc_conns_sender,
transform,
max_concurrent_encrypt,
spawner.clone(),
)
.map_err(|e| error!("transform_pool_loop() error: {:?}", e))
.map(|_| ());
Expand All @@ -115,7 +114,7 @@ where
relay_server(
incoming_enc_conns,
timer_client,
CONN_TIMEOUT_TICKS,
RELAY_CONN_TIMEOUT_TICKS,
KEEPALIVE_TICKS,
spawner.clone(),
)
Expand Down
1 change: 0 additions & 1 deletion components/channeler/src/listen_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,6 @@ where
outgoing_conns,
c_encrypt_transform,
c_max_concurrent_encrypt,
c_spawner.clone(),
)
.map_err(|e| error!("transform_pool_loop: {:?}", e))
.map(|_| ());
Expand Down
71 changes: 12 additions & 59 deletions components/common/src/transform_pool.rs
Original file line number Diff line number Diff line change
@@ -1,86 +1,39 @@
use futures::channel::mpsc;
use futures::stream::select;
use futures::task::{Spawn, SpawnExt};
use futures::{future, stream, Sink, SinkExt, Stream, StreamExt};
use futures::{Sink, SinkExt, Stream, StreamExt};

use crate::conn::FutTransform;

#[derive(Debug)]
pub enum TransformPoolLoopError {
SpawnError,
}

enum TransformPoolEvent<I> {
Incoming(I),
IncomingClosed,
TransformDone,
}
pub enum TransformPoolLoopError {}

/// Transform a stream of incoming items to outgoing items.
/// The transformation is asynchronous, therefore outgoing items
/// might not be in the same order in which the incoming items entered.
///
/// max_concurrent is the maximum amount of concurrent transformations.
pub async fn transform_pool_loop<IN, OUT, I, O, T, S>(
pub async fn transform_pool_loop<IN, OUT, I, O, T>(
incoming: I,
outgoing: O,
transform: T,
max_concurrent: usize,
spawner: S,
) -> Result<(), TransformPoolLoopError>
where
IN: Send + 'static,
OUT: Send,
T: FutTransform<Input = IN, Output = Option<OUT>> + Clone + Send + 'static,
I: Stream<Item = IN> + Unpin,
O: Sink<OUT> + Clone + Send + Unpin + 'static,
S: Spawn,
{
let incoming = incoming
.map(TransformPoolEvent::Incoming)
.chain(stream::once(future::ready(
TransformPoolEvent::IncomingClosed,
)));

let (close_sender, close_receiver) = mpsc::channel::<()>(0);
let close_receiver = close_receiver.map(|()| TransformPoolEvent::TransformDone);

let mut incoming_events = select(incoming, close_receiver);
let mut num_concurrent: usize = 0;
let mut incoming_closed = false;
while let Some(event) = incoming_events.next().await {
match event {
TransformPoolEvent::Incoming(input_value) => {
if num_concurrent >= max_concurrent {
warn!("transform_pool_loop: Dropping connection: max_concurrent exceeded");
// We drop the input value because we don't have any room to process it.
continue;
incoming
.for_each_concurrent(Some(max_concurrent), move |input_value| {
let mut c_outgoing = outgoing.clone();
let mut c_transform = transform.clone();
async move {
if let Some(output_value) = c_transform.transform(input_value).await {
let _ = c_outgoing.send(output_value).await;
}
num_concurrent = num_concurrent.checked_add(1).unwrap();
let mut c_outgoing = outgoing.clone();
let mut c_transform = transform.clone();
let mut c_close_sender = close_sender.clone();
let fut = async move {
if let Some(output_value) = c_transform.transform(input_value).await {
let _ = c_outgoing.send(output_value).await;
}
let _ = c_close_sender.send(()).await;
};
spawner
.spawn(fut)
.map_err(|_| TransformPoolLoopError::SpawnError)?;
}
TransformPoolEvent::IncomingClosed => {
incoming_closed = true;
}
TransformPoolEvent::TransformDone => {
num_concurrent = num_concurrent.checked_sub(1).unwrap();
}
}
if incoming_closed && num_concurrent == 0 {
break;
}
}
})
.await;
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions components/connection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
clippy::new_without_default
)]

#[macro_use]
extern crate log;

mod timeout;
mod transforms;

pub use self::transforms::{
Expand Down
56 changes: 56 additions & 0 deletions components/connection/src/timeout.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use futures::{future, select, FutureExt, StreamExt};

use common::conn::{BoxFuture, FutTransform};
use timer::TimerClient;

/// A future transform's wrapper, adding timeout
#[derive(Debug, Clone)]
pub struct TimeoutFutTransform<FT> {
fut_transform: FT,
timer_client: TimerClient,
timeout_ticks: usize,
}

impl<FT> TimeoutFutTransform<FT> {
pub fn new(fut_transform: FT, timer_client: TimerClient, timeout_ticks: usize) -> Self {
Self {
fut_transform,
timer_client,
timeout_ticks,
}
}
}

impl<FT, I, O> FutTransform for TimeoutFutTransform<FT>
where
FT: FutTransform<Input = I, Output = Option<O>> + Send,
I: Send + 'static,
O: Send,
{
type Input = I;
type Output = Option<O>;

fn transform(&mut self, input: Self::Input) -> BoxFuture<'_, Self::Output> {
Box::pin(async move {
let timer_stream = match self.timer_client.request_timer_stream().await {
Ok(timer_stream) => timer_stream,
Err(e) => {
error!("TimeoutTransform: request_timer_stream() error: {:?}", e);
return None;
}
};

// A future that waits `timeout_ticks`:
let fut_timeout = timer_stream
.take(self.timeout_ticks)
.for_each(|_| future::ready(()));
let fut_output = self.fut_transform.transform(input);

// Race execution of `fut_transform` against the timer:
select! {
_fut_timeout = fut_timeout.fuse() => None,
fut_output = fut_output.fuse() => fut_output,
}
})
}
}
26 changes: 17 additions & 9 deletions components/connection/src/transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ use keepalive::KeepAliveChannel;
use secure_channel::SecureChannel;
use version::VersionPrefix;

use crate::timeout::TimeoutFutTransform;

/// Amount of ticks we allocate to perform a complete handshake.
pub const CONN_TIMEOUT_TICKS: usize = 8;

/// Create an encrypt-keepalive transformation:
/// Composes: Encryption * Keepalive
pub fn create_encrypt_keepalive<R, S>(
Expand All @@ -39,11 +44,11 @@ where
TICKS_TO_REKEY,
spawner.clone(),
);
let keepalive_transform = KeepAliveChannel::new(timer_client, KEEPALIVE_TICKS, spawner);
let keepalive_transform = KeepAliveChannel::new(timer_client.clone(), KEEPALIVE_TICKS, spawner);

// Note that this transform does not contain the version prefix, as it is applied to a
// connection between two nodes, relayed using a relay server.
FuncFutTransform::new(move |(opt_public_key, conn_pair_vec)| {
let fut_transform = FuncFutTransform::new(move |(opt_public_key, conn_pair_vec)| {
let mut c_encrypt_transform = encrypt_transform.clone();
let mut c_keepalive_transform = keepalive_transform.clone();
Box::pin(async move {
Expand All @@ -53,7 +58,8 @@ where
let conn_pair_vec = c_keepalive_transform.transform(conn_pair_vec).await;
Some((public_key, conn_pair_vec))
})
})
});
TimeoutFutTransform::new(fut_transform, timer_client, CONN_TIMEOUT_TICKS)
}

/// Turn a regular connector into a secure connector.
Expand Down Expand Up @@ -81,9 +87,9 @@ where
TICKS_TO_REKEY,
spawner.clone(),
);
let keepalive_transform = KeepAliveChannel::new(timer_client, KEEPALIVE_TICKS, spawner);
let keepalive_transform = KeepAliveChannel::new(timer_client.clone(), KEEPALIVE_TICKS, spawner);

FuncFutTransform::new(move |(opt_public_key, conn_pair)| {
let fut_transform = FuncFutTransform::new(move |(opt_public_key, conn_pair)| {
let mut c_version_transform = version_transform.clone();
let mut c_encrypt_transform = encrypt_transform.clone();
let mut c_keepalive_transform = keepalive_transform.clone();
Expand All @@ -95,7 +101,8 @@ where
let conn_pair = c_keepalive_transform.transform(conn_pair).await;
Some((public_key, conn_pair))
})
})
});
TimeoutFutTransform::new(fut_transform, timer_client, CONN_TIMEOUT_TICKS)
}

// TODO: Possibly remove in favour of create_version_encrypt_keepalive
Expand All @@ -114,9 +121,9 @@ where
C: FutTransform<Input = NetAddress, Output = Option<ConnPairVec>> + Clone + Send + 'static,
{
let conn_transform =
create_version_encrypt_keepalive(timer_client, identity_client, rng, spawner);
create_version_encrypt_keepalive(timer_client.clone(), identity_client, rng, spawner);

FuncFutTransform::new(move |(public_key, net_address)| {
let fut_transform = FuncFutTransform::new(move |(public_key, net_address)| {
let mut c_connector = connector.clone();
let mut c_conn_transform = conn_transform.clone();
Box::pin(async move {
Expand All @@ -126,5 +133,6 @@ where
.await?;
Some(conn_pair)
})
})
});
TimeoutFutTransform::new(fut_transform, timer_client, CONN_TIMEOUT_TICKS)
}
2 changes: 1 addition & 1 deletion components/proto/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub const KEEPALIVE_TICKS: usize = 0x20;

/// Relay server: The amount of ticks to wait before a relay connection from a client
/// sends identification of which type of connection it is.
pub const CONN_TIMEOUT_TICKS: usize = 4;
pub const RELAY_CONN_TIMEOUT_TICKS: usize = 4;

/// The stream TCP connection is split into prefix length frames. This is the maximum allowed
/// length for such frame, measured in bytes.
Expand Down

0 comments on commit f4c995a

Please sign in to comment.