Skip to content

Commit

Permalink
outbound: Return a default endpoint on reject (#690)
Browse files Browse the repository at this point in the history
When the resolver rejects resolution, we currently propagate that error
so that it can be handled via fallback. And due to recent HTTP router
changes, these resolution errors can propagate up across splits, etc.

This change simplifies this behavior by isntead synthesizing a
resolution with a default endpoint.

The `not_http` reason has been removed, as it's no longer useful.
  • Loading branch information
olix0r authored Oct 2, 2020
1 parent 86c71ea commit 9870faa
Show file tree
Hide file tree
Showing 11 changed files with 134 additions and 68 deletions.
23 changes: 0 additions & 23 deletions linkerd/app/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,6 @@ pub struct ProxyMetrics {
pub transport: transport::Metrics,
}

#[derive(Clone, Debug)]
pub struct DiscoveryRejected(());

impl DiscoveryRejected {
pub fn new() -> Self {
DiscoveryRejected(())
}
}

impl std::fmt::Display for DiscoveryRejected {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "discovery rejected")
}
}

impl std::error::Error for DiscoveryRejected {}

impl From<Addr> for DiscoveryRejected {
fn from(_: Addr) -> Self {
Self::new()
}
}

#[derive(Clone, Debug, Default)]
pub struct SkipByPort(std::sync::Arc<indexmap::IndexSet<u16>>);

Expand Down
16 changes: 8 additions & 8 deletions linkerd/app/integration/src/tests/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ mod transport {
// Connection to the server should be a failure with the EXFULL error
// code.
assert_eventually_contains!(metrics.get("/metrics").await,
"tcp_close_total{peer=\"dst\",direction=\"inbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"EXFULL\"} 1");
"tcp_close_total{peer=\"dst\",direction=\"inbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"EXFULL\"} 1");
// Connection from the client should have closed cleanly.
assert_eventually_contains!(
metrics.get("/metrics").await,
Expand Down Expand Up @@ -963,7 +963,7 @@ mod transport {
// Connection to the server should be a failure with the EXFULL error
// code.
assert_eventually_contains!(metrics.get("/metrics").await,
"tcp_close_total{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"EXFULL\"} 1");
"tcp_close_total{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"EXFULL\"} 1");
// Connection from the client should have closed cleanly.
assert_eventually_contains!(metrics.get("/metrics").await,
"tcp_close_total{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1");
Expand Down Expand Up @@ -1121,7 +1121,7 @@ mod transport {
tcp_client.write(TcpFixture::HELLO_MSG).await;
assert_eq!(tcp_client.read().await, TcpFixture::BYE_MSG.as_bytes());
let expected = format!(
"tcp_open_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} 1",
"tcp_open_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} 1",
proxy.outbound_server.as_ref().unwrap().addr,
);
assert_eventually_contains!(metrics.get("/metrics").await, &expected);
Expand Down Expand Up @@ -1184,7 +1184,7 @@ mod transport {
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1");
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 1");
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 1");

let tcp_client = client.connect().await;

Expand All @@ -1194,14 +1194,14 @@ mod transport {
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 1");
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 1");
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 1");

tcp_client.shutdown().await;
let out = metrics.get("/metrics").await;
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"src\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"loopback\",errno=\"\"} 2");
assert_eventually_contains!(out,
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\",errno=\"\"} 2");
"tcp_connection_duration_ms_count{peer=\"dst\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\",errno=\"\"} 2");
}

#[tokio::test]
Expand All @@ -1217,7 +1217,7 @@ mod transport {
TcpFixture::BYE_MSG.len()
);
let dst_expected = format!(
"tcp_write_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} {}",
"tcp_write_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} {}",
proxy.outbound_server.as_ref().unwrap().addr,
TcpFixture::HELLO_MSG.len()
);
Expand Down Expand Up @@ -1246,7 +1246,7 @@ mod transport {
TcpFixture::HELLO_MSG.len()
);
let dst_expected = format!(
"tcp_read_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_http\"}} {}",
"tcp_read_bytes_total{{peer=\"dst\",authority=\"{}\",direction=\"outbound\",tls=\"no_identity\",no_tls_reason=\"not_provided_by_service_discovery\"}} {}",
proxy.outbound_server.as_ref().unwrap().addr,
TcpFixture::BYE_MSG.len()
);
Expand Down
12 changes: 10 additions & 2 deletions linkerd/app/outbound/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ impl Into<Addr> for &'_ HttpConcrete {
}
}

impl Into<SocketAddr> for &'_ HttpConcrete {
fn into(self) -> SocketAddr {
self.dst
.socket_addr()
.unwrap_or_else(|| self.logical.orig_dst)
}
}

impl std::fmt::Display for HttpConcrete {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.dst.fmt(f)
Expand Down Expand Up @@ -201,7 +209,7 @@ impl From<HttpLogical> for HttpEndpoint {
tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into(),
),
concrete: logical.into(),
metadata: Metadata::empty(),
metadata: Metadata::default(),
}
}
}
Expand Down Expand Up @@ -331,7 +339,7 @@ impl From<SocketAddr> for TcpEndpoint {
Self {
addr,
dst: addr.into(),
identity: Conditional::None(tls::ReasonForNoPeerName::NotHttp.into()),
identity: Conditional::None(tls::ReasonForNoPeerName::PortSkipped.into()),
labels: None,
}
}
Expand Down
15 changes: 3 additions & 12 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use linkerd2_app_core::{
spans::SpanConverter,
svc::{self},
transport::{self, listen, tls},
Conditional, DiscoveryRejected, Error, ProxyMetrics, StackMetrics, TraceContextLayer,
CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
Conditional, Error, ProxyMetrics, StackMetrics, TraceContextLayer, CANONICAL_DST_HEADER,
DST_OVERRIDE_HEADER, L5D_REQUIRE_ID,
};
use std::{collections::HashMap, net, time::Duration};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -500,13 +500,6 @@ impl Config {

// Load balances TCP streams that cannot be decoded as HTTP.
let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve))
.push_fallback_with_predicate(
tcp_forward
.clone()
.push_map_target(TcpEndpoint::from)
.into_inner(),
is_discovery_rejected,
)
.push_on_response(
svc::layers()
.push_failfast(dispatch_timeout)
Expand Down Expand Up @@ -591,9 +584,7 @@ pub fn trace_labels() -> HashMap<String, String> {

fn is_discovery_rejected(err: &Error) -> bool {
fn is_rejected(err: &(dyn std::error::Error + 'static)) -> bool {
err.is::<DiscoveryRejected>()
|| err.is::<profiles::InvalidProfileAddr>()
|| err.source().map(is_rejected).unwrap_or(false)
err.is::<profiles::InvalidProfileAddr>() || err.source().map(is_rejected).unwrap_or(false)
}

let rejected = is_rejected(&**err);
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/outbound/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn plaintext_tcp() {
let resolver = test_support::resolver().endpoint_exists(
logical.clone(),
target_addr,
test_support::resolver::Metadata::empty(),
test_support::resolver::Metadata::default(),
);

// Build the outbound TCP balancer stack.
Expand Down
59 changes: 59 additions & 0 deletions linkerd/app/src/dst/default_resolve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use super::Rejected;
use futures::{future, prelude::*, stream};
use linkerd2_app_core::{
proxy::core::{Resolve, Update},
svc, Error,
};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};

pub fn layer<S>() -> impl svc::Layer<S, Service = RecoverDefaultResolve<S>> + Clone {
svc::layer::mk(RecoverDefaultResolve)
}

#[derive(Clone, Debug)]
pub struct RecoverDefaultResolve<S>(S);

impl<T, S> tower::Service<T> for RecoverDefaultResolve<S>
where
for<'t> &'t T: Into<std::net::SocketAddr>,
S: Resolve<T, Error = Error>,
S::Endpoint: Default + Send + 'static,
S::Resolution: Send + 'static,
S::Future: Send + 'static,
stream::Once<future::Ready<Result<Update<S::Endpoint>, S::Error>>>:
stream::TryStream<Ok = Update<S::Endpoint>, Error = S::Error>,
{
type Response = future::Either<
S::Resolution,
stream::Once<future::Ready<Result<Update<S::Endpoint>, Error>>>,
>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Error>> + Send + 'static>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}

fn call(&mut self, t: T) -> Self::Future {
let addr = (&t).into();
Box::pin(
self.0
.resolve(t)
.map_ok(future::Either::Left)
.or_else(move |error| {
if Rejected::matches(&*error) {
tracing::debug!(%error, %addr, "Synthesizing endpoint");
let endpoint = (addr, S::Endpoint::default());
let res = stream::once(future::ok(Update::Reset(vec![endpoint])));
future::ok(future::Either::Right(res))
} else {
future::err(error)
}
}),
)
}
}
40 changes: 38 additions & 2 deletions linkerd/app/src/dst/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod default_resolve;
mod permit;
mod resolve;

use self::default_resolve::RecoverDefaultResolve;
use indexmap::IndexSet;
use linkerd2_app_core::{
control, dns, profiles, proxy::identity, request_filter::RequestFilter, svc, transport::tls,
ControlHttpMetrics, Error,
Addr, ControlHttpMetrics, Error,
};
use permit::PermitConfiguredDsts;
use std::time::Duration;
Expand All @@ -21,6 +23,9 @@ pub struct Config {
pub initial_profile_timeout: Duration,
}

#[derive(Clone, Debug)]
pub struct Rejected(());

/// Handles to destination service clients.
///
/// The addr is preserved for logging.
Expand All @@ -30,7 +35,9 @@ pub struct Dst {
PermitConfiguredDsts<profiles::InvalidProfileAddr>,
profiles::Client<control::Client<BoxBody>, resolve::BackoffUnlessInvalidArgument>,
>,
pub resolve: RequestFilter<PermitConfiguredDsts, resolve::Resolve<control::Client<BoxBody>>>,
pub resolve: RecoverDefaultResolve<
RequestFilter<PermitConfiguredDsts, resolve::Resolve<control::Client<BoxBody>>>,
>,
}

impl Config {
Expand All @@ -48,6 +55,7 @@ impl Config {
self.get_suffixes,
self.get_networks,
)))
.push(default_resolve::layer())
.into_inner();

let profiles = svc::stack(profiles::Client::new(
Expand All @@ -69,3 +77,31 @@ impl Config {
})
}
}

impl std::fmt::Display for Rejected {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "rejected discovery")
}
}

impl std::error::Error for Rejected {}

impl From<Addr> for Rejected {
fn from(_: Addr) -> Self {
Rejected(())
}
}

impl Rejected {
fn matches(err: &(dyn std::error::Error + 'static)) -> bool {
if err.is::<Self>() {
return true;
}

if let Some(status) = err.downcast_ref::<tonic::Status>() {
return status.code() == tonic::Code::InvalidArgument;
}

err.source().map(Self::matches).unwrap_or(false)
}
}
9 changes: 4 additions & 5 deletions linkerd/app/src/dst/permit.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use super::Rejected;
use ipnet::{Contains, IpNet};
use linkerd2_app_core::{
dns::Suffix, request_filter::FilterRequest, Addr, DiscoveryRejected, Error,
};
use linkerd2_app_core::{dns::Suffix, request_filter::FilterRequest, Addr, Error};
use std::marker::PhantomData;
use std::net::IpAddr;
use std::sync::Arc;

pub struct PermitConfiguredDsts<E = DiscoveryRejected> {
pub struct PermitConfiguredDsts<E = Rejected> {
name_suffixes: Arc<Vec<Suffix>>,
networks: Arc<Vec<IpNet>>,
_error: PhantomData<fn(E)>,
Expand Down Expand Up @@ -74,7 +73,7 @@ where
if permitted {
Ok(t)
} else {
Err(E::from(addr.clone()).into())
Err(E::from(addr).into())
}
}
}
15 changes: 7 additions & 8 deletions linkerd/app/src/dst/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use linkerd2_app_core::{
api_resolve as api,
resolve::{self, recover},
},
DiscoveryRejected, Error, Recover,
Error, Recover,
};
use tonic::{
body::{Body, BoxBody},
Expand Down Expand Up @@ -46,16 +46,15 @@ impl From<ExponentialBackoff> for BackoffUnlessInvalidArgument {
impl Recover<Error> for BackoffUnlessInvalidArgument {
type Backoff = ExponentialBackoffStream;

fn recover(&self, err: Error) -> Result<Self::Backoff, Error> {
match err.downcast::<Status>() {
Ok(ref status) if status.code() == Code::InvalidArgument => {
tracing::debug!(message = "cannot recover", %status);
return Err(DiscoveryRejected::new().into());
fn recover(&self, error: Error) -> Result<Self::Backoff, Error> {
if let Some(status) = error.downcast_ref::<Status>() {
if status.code() == Code::InvalidArgument {
tracing::debug!(%status, "Cannot recover");
return Err(error);
}
Ok(status) => tracing::trace!(message = "recovering", %status),
Err(error) => tracing::trace!(message = "recovering", %error),
}

tracing::trace!(%error, "Recovering");
Ok(self.0.stream())
}
}
6 changes: 4 additions & 2 deletions linkerd/proxy/api-resolve/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ pub enum ProtocolHint {

// === impl Metadata ===

impl Metadata {
pub fn empty() -> Self {
impl Default for Metadata {
fn default() -> Self {
Self {
labels: IndexMap::default(),
protocol_hint: ProtocolHint::Unknown,
Expand All @@ -51,7 +51,9 @@ impl Metadata {
authority_override: None,
}
}
}

impl Metadata {
pub fn new(
labels: IndexMap<String, String>,
protocol_hint: ProtocolHint,
Expand Down
Loading

0 comments on commit 9870faa

Please sign in to comment.