Skip to content

Commit

Permalink
internal: Remove the SetKeepalive trait (#363)
Browse files Browse the repository at this point in the history
In preparation of decoupling the listener logic from TLS discovery, I'm
trying to simplify the transport module.

This change is intended to make the `listen` and `connect` modules
primarily responsible for preparing a `TCPStream` before any other
transformations are applied to the IO stream. The `internal::Io` trait
can be narrowed and the various `Io` impls no longer need to proxy this
API.

In follow-up changes, the `listen` module will be decoupled from
identity/TLS; and some of the `transport` core types will be split out
into subcrates.
  • Loading branch information
olix0r authored Sep 27, 2019
1 parent b7faa70 commit 4c3d706
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 241 deletions.
9 changes: 3 additions & 6 deletions src/app/inbound/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::proxy::http::{
strip_header,
};
use crate::proxy::{accept, Server};
use crate::transport::{self, connect, keepalive, tls, Connection};
use crate::transport::{self, connect, tls, Connection};
use crate::{core::listen::ServeConnection, svc, trace_context, Addr};
use linkerd2_reconnect as reconnect;
use opencensus_proto::trace::v1 as oc;
Expand Down Expand Up @@ -55,9 +55,8 @@ where

// Establishes connections to the local application (for both
// TCP forwarding and HTTP proxying).
let connect = svc::stack(connect::svc())
let connect = svc::stack(connect::svc(config.inbound_connect_keepalive))
.push(tls::client::layer(local_identity))
.push(keepalive::connect::layer(config.inbound_connect_keepalive))
.push_timeout(config.inbound_connect_timeout)
.push(transport_metrics.connect("inbound"))
.push(rewrite_loopback_addr::layer());
Expand Down Expand Up @@ -212,9 +211,7 @@ where

// As the inbound proxy accepts connections, we don't do any
// special transport-level handling.
let accept = accept::builder()
.push(keepalive::accept::layer(config.inbound_accept_keepalive))
.push(transport_metrics.accept("inbound"));
let accept = accept::builder().push(transport_metrics.accept("inbound"));

Server::new(
"out",
Expand Down
47 changes: 30 additions & 17 deletions src/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{handle_time, inbound, outbound, tap::serve_tap};
use crate::opencensus::SpanExporter;
use crate::proxy::{self, http::metrics as http_metrics};
use crate::svc::{self, LayerExt};
use crate::transport::{self, connect, keepalive, tls, GetOriginalDst, Listen};
use crate::transport::{self, connect, tls, GetOriginalDst, Listen};
use crate::{dns, drain, logging, metrics::FmtMetrics, tap, task, telemetry, trace, Conditional};
use futures::{self, future, Future};
use linkerd2_reconnect as reconnect;
Expand Down Expand Up @@ -70,29 +70,40 @@ where
let local_identity = identity.as_ref().map(|(l, _)| l.clone());

let control_listener = config.control_listener.as_ref().map(|cl| {
let listener = Listen::bind(cl.listener.addr, local_identity.clone())
.expect("dst_svc listener bind");
let listener = Listen::bind(
cl.listener.addr,
local_identity.clone(),
config.inbound_accept_keepalive,
)
.expect("tap listener bind");

(listener, cl.tap_svc_name.clone())
});

let admin_listener = Listen::bind(config.admin_listener.addr, local_identity.clone())
.expect("metrics listener bind");
let admin_listener = Listen::bind(
config.admin_listener.addr,
local_identity.clone(),
config.inbound_accept_keepalive,
)
.expect("tap listener bind");

let outbound_listener = Listen::bind(
config.outbound_listener.addr,
Conditional::None(tls::ReasonForNoPeerName::Loopback.into()),
config.outbound_accept_keepalive,
)
.expect("outbound listener bind")
.with_original_dst(get_original_dst.clone())
.without_protocol_detection_for(config.outbound_ports_disable_protocol_detection.clone());

let inbound_listener = Listen::bind(config.inbound_listener.addr, local_identity)
.expect("inbound listener bind")
.with_original_dst(get_original_dst.clone())
.without_protocol_detection_for(
config.inbound_ports_disable_protocol_detection.clone(),
);
let inbound_listener = Listen::bind(
config.inbound_listener.addr,
local_identity,
config.inbound_accept_keepalive,
)
.expect("inbound listener bind")
.with_original_dst(get_original_dst.clone())
.without_protocol_detection_for(config.inbound_ports_disable_protocol_detection.clone());

let runtime = runtime.into();

Expand Down Expand Up @@ -265,11 +276,10 @@ where
config.outbound_connect_keepalive
};

let svc = svc::stack(connect::svc())
let svc = svc::stack(connect::svc(keepalive))
.push(tls::client::layer(Conditional::Some(
id_config.trust_anchors.clone(),
)))
.push(keepalive::connect::layer(keepalive))
.push_timeout(config.control_connect_timeout)
.push(control::client::layer())
.push(control::resolve::layer(dns_resolver.clone()))
Expand Down Expand Up @@ -318,9 +328,8 @@ where
config.outbound_connect_keepalive
};

svc::stack(connect::svc())
svc::stack(connect::svc(keepalive))
.push(tls::client::layer(local_identity.clone()))
.push(keepalive::connect::layer(keepalive))
.push_timeout(config.control_connect_timeout)
.push(control::client::layer())
.push(control::resolve::layer(dns_resolver.clone()))
Expand Down Expand Up @@ -406,9 +415,13 @@ where
);

let trace_collector_svc = config.trace_collector_addr.as_ref().map(|addr| {
svc::stack(connect::svc())
let keepalive = if addr.addr.is_loopback() {
config.inbound_connect_keepalive
} else {
config.outbound_connect_keepalive
};
svc::stack(connect::svc(keepalive))
.push(tls::client::layer(local_identity.clone()))
.push(keepalive::connect::layer(config.outbound_connect_keepalive))
.push_timeout(config.control_connect_timeout)
// TODO: perhaps rename from "control" to "grpc"
.push(control::client::layer())
Expand Down
9 changes: 3 additions & 6 deletions src/app/outbound/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::proxy::http::{
};
use crate::proxy::{self, accept, Server};
use crate::transport::Connection;
use crate::transport::{self, connect, keepalive, tls};
use crate::transport::{self, connect, tls};
use crate::{svc, trace_context, Addr};
use linkerd2_proxy_discover as discover;
use linkerd2_reconnect as reconnect;
Expand Down Expand Up @@ -72,9 +72,8 @@ where

// Establishes connections to remote peers (for both TCP
// forwarding and HTTP proxying).
let connect = svc::stack(connect::svc())
let connect = svc::stack(connect::svc(config.outbound_connect_keepalive))
.push(tls::client::layer(local_identity))
.push(keepalive::connect::layer(config.outbound_connect_keepalive))
.push_timeout(config.outbound_connect_timeout)
.push(transport_metrics.connect("outbound"));

Expand Down Expand Up @@ -285,9 +284,7 @@ where

// Instantiated for each TCP connection received from the local
// application (including HTTP connections).
let accept = accept::builder()
.push(keepalive::accept::layer(config.outbound_accept_keepalive))
.push(transport_metrics.accept("outbound"));
let accept = accept::builder().push(transport_metrics.accept("outbound"));

Server::new(
"out",
Expand Down
8 changes: 6 additions & 2 deletions src/transport/connect.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::svc::{mk, Service};
use futures::{try_ready, Future, Poll};
use std::{io, net::SocketAddr};
use std::{io, net::SocketAddr, time::Duration};
use tokio::net::{tcp, TcpStream};
use tracing::debug;

Expand All @@ -9,15 +9,17 @@ pub trait HasPeerAddr {
}

pub fn svc<T>(
keepalive: Option<Duration>,
) -> impl Service<T, Response = TcpStream, Error = io::Error, Future = ConnectFuture> + Clone
where
T: HasPeerAddr,
{
mk(|target: T| {
mk(move |target: T| {
let addr = target.peer_addr();
debug!("connecting to {}", addr);
ConnectFuture {
addr,
keepalive,
future: TcpStream::connect(&addr),
}
})
Expand All @@ -26,6 +28,7 @@ where
#[derive(Debug)]
pub struct ConnectFuture {
addr: SocketAddr,
keepalive: Option<Duration>,
future: tcp::ConnectFuture,
}

Expand All @@ -48,6 +51,7 @@ impl Future for ConnectFuture {
}));
debug!("connection established to {}", self.addr);
super::set_nodelay_or_warn(&io);
super::set_keepalive_or_warn(&io, self.keepalive);
Ok(io.into())
}
}
26 changes: 3 additions & 23 deletions src/transport/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use futures::Poll;
use tokio::io::{AsyncRead, AsyncWrite};

use self::internal::Io;
use super::{AddrInfo, SetKeepalive};
use super::AddrInfo;

/// A public wrapper around a `Box<Io>`.
///
Expand Down Expand Up @@ -76,18 +76,8 @@ impl AddrInfo for BoxedIo {
}
}

impl SetKeepalive for BoxedIo {
fn keepalive(&self) -> io::Result<Option<::std::time::Duration>> {
self.0.keepalive()
}

fn set_keepalive(&mut self, ka: Option<::std::time::Duration>) -> io::Result<()> {
self.0.set_keepalive(ka)
}
}

pub(super) mod internal {
use super::{AddrInfo, AsyncRead, AsyncWrite, Buf, Poll, SetKeepalive, Shutdown};
use super::{AddrInfo, AsyncRead, AsyncWrite, Buf, Poll, Shutdown};
use std::io;
use tokio::net::TcpStream;

Expand All @@ -96,7 +86,7 @@ pub(super) mod internal {
/// writes.
///
/// Instead, used the concrete `BoxedIo` type.
pub trait Io: AddrInfo + AsyncRead + AsyncWrite + SetKeepalive + Send {
pub trait Io: AddrInfo + AsyncRead + AsyncWrite + Send {
fn shutdown_write(&mut self) -> io::Result<()>;

/// This method is to allow using `Async::write_buf` even through a
Expand Down Expand Up @@ -163,16 +153,6 @@ mod tests {
}
}

impl SetKeepalive for WriteBufDetector {
fn keepalive(&self) -> io::Result<Option<::std::time::Duration>> {
unreachable!("not called in test")
}

fn set_keepalive(&mut self, _: Option<::std::time::Duration>) -> io::Result<()> {
unreachable!("not called in test")
}
}

impl Io for WriteBufDetector {
fn shutdown_write(&mut self) -> Result<(), io::Error> {
unreachable!("not called in test")
Expand Down
124 changes: 0 additions & 124 deletions src/transport/keepalive.rs

This file was deleted.

Loading

0 comments on commit 4c3d706

Please sign in to comment.