From 580cb288da330b1d6247142ee9e91cb03b0350f7 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 2 Oct 2020 13:31:40 -0700 Subject: [PATCH] Skip endpoint resolution when profile lookup is rejected (#691) When profile lookup is rejected, this error propagates to the stack and is handled by falling back to an alternate stack. In order to move profile discovery out of the HTTP stack and onto the initial TCP accept stack, we want to avoid this sort of fallback behavior. So this change modifies profile discovery to make the profile optional. When a profile lookup is rejected/skipped, we now simply return a `None` profile; and in these cases we avoid performing endpoint resolution by omitting a concrete address. --- linkerd/app/inbound/src/endpoint.rs | 27 +- linkerd/app/inbound/src/lib.rs | 5 - linkerd/app/outbound/src/endpoint.rs | 123 +++----- linkerd/app/outbound/src/lib.rs | 98 +++---- linkerd/app/src/dst/default_profile.rs | 42 +++ linkerd/app/src/dst/default_resolve.rs | 6 +- linkerd/app/src/dst/mod.rs | 61 ++-- linkerd/app/src/dst/permit.rs | 115 +++++--- linkerd/app/src/dst/resolve.rs | 2 +- linkerd/proxy/discover/src/buffer.rs | 2 - linkerd/service-profiles/src/client.rs | 35 +-- linkerd/service-profiles/src/discover.rs | 2 +- .../src/http/route_request.rs | 14 +- linkerd/service-profiles/src/lib.rs | 8 +- linkerd/service-profiles/src/split.rs | 273 ++++++++++-------- 15 files changed, 403 insertions(+), 410 deletions(-) create mode 100644 linkerd/app/src/dst/default_profile.rs diff --git a/linkerd/app/inbound/src/endpoint.rs b/linkerd/app/inbound/src/endpoint.rs index 8b42717f34..4f469a8266 100644 --- a/linkerd/app/inbound/src/endpoint.rs +++ b/linkerd/app/inbound/src/endpoint.rs @@ -7,7 +7,7 @@ use linkerd2_app_core::{ transport::{listen, tls}, Addr, Conditional, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, }; -use std::{convert::TryInto, fmt, net::SocketAddr, sync::Arc}; +use std::{convert::TryInto, net::SocketAddr, sync::Arc}; use tracing::debug; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -21,7 +21,7 @@ pub struct Target { #[derive(Clone, Debug)] pub struct Logical { target: Target, - profiles: profiles::Receiver, + profiles: Option, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -114,18 +114,13 @@ pub(super) fn route((route, logical): (profiles::http::Route, Logical)) -> dst:: // === impl Target === +/// Used for profile discovery. impl Into for &'_ Target { fn into(self) -> Addr { self.dst.clone() } } -impl AsRef for Target { - fn as_ref(&self) -> &Addr { - &self.dst - } -} - impl tls::HasPeerIdentity for Target { fn peer_identity(&self) -> tls::PeerIdentity { Conditional::None(tls::ReasonForNoPeerName::Loopback.into()) @@ -197,12 +192,6 @@ impl tap::Inspect for Target { } } -impl fmt::Display for Target { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.dst.fmt(f) - } -} - impl stack_tracing::GetSpan<()> for Target { fn get_span(&self, _: &()) -> tracing::Span { use tracing::info_span; @@ -275,14 +264,14 @@ impl From for Target { // === impl Logical === -impl From<(profiles::Receiver, Target)> for Logical { - fn from((profiles, target): (profiles::Receiver, Target)) -> Self { +impl From<(Option, Target)> for Logical { + fn from((profiles, target): (Option, Target)) -> Self { Self { profiles, target } } } -impl AsRef for Logical { - fn as_ref(&self) -> &profiles::Receiver { - &self.profiles +impl Into> for &'_ Logical { + fn into(self) -> Option { + self.profiles.clone() } } diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index fbb0e9f346..e92de2644b 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -248,14 +248,9 @@ impl Config { .push_on_response(svc::layers().box_http_response()) .check_new_service::>(); - let forward = target - .instrument(|_: &Target| debug_span!("forward")) - .check_new_service::>(); - // Attempts to resolve the target as a service profile or, if that // fails, skips that stack to forward to the local endpoint. profile - .push_fallback(forward) .check_new_service::>() // If the traffic is targeted at the inbound port, send it through // the loopback service (i.e. as a gateway). diff --git a/linkerd/app/outbound/src/endpoint.rs b/linkerd/app/outbound/src/endpoint.rs index 07e84a722a..ef4d1297d4 100644 --- a/linkerd/app/outbound/src/endpoint.rs +++ b/linkerd/app/outbound/src/endpoint.rs @@ -1,8 +1,7 @@ use crate::http::uri::Authority; use indexmap::IndexMap; use linkerd2_app_core::{ - dst, - metric_labels, + dst, metric_labels, metric_labels::{prefix_labels, EndpointLabels, TlsStatus}, profiles, proxy::{ @@ -15,8 +14,7 @@ use linkerd2_app_core::{ }, router, transport::{listen, tls}, - Addr, - Conditional, //L5D_REQUIRE_ID, + Addr, Conditional, }; use std::{net::SocketAddr, sync::Arc}; @@ -38,7 +36,7 @@ pub struct HttpLogical { #[derive(Clone, Debug, Eq, PartialEq)] pub struct HttpConcrete { - pub dst: Addr, + pub resolve: Option, pub logical: HttpLogical, } @@ -47,7 +45,7 @@ pub struct LogicalPerRequest(HttpAccept); #[derive(Clone, Debug)] pub struct Profile { - pub rx: profiles::Receiver, + pub rx: Option, pub logical: HttpLogical, } @@ -81,21 +79,17 @@ impl From for TcpLogical { } } +/// Used as a default destination when resolution is rejected. impl Into for &'_ TcpLogical { fn into(self) -> SocketAddr { self.addr } } -impl Into for &'_ TcpLogical { - fn into(self) -> Addr { - self.addr.into() - } -} - -impl std::fmt::Display for TcpLogical { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.addr.fmt(f) +/// Used to resolve endpoints. +impl Into> for &'_ TcpLogical { + fn into(self) -> Option { + Some(self.addr.into()) } } @@ -118,102 +112,63 @@ impl Into for &'_ HttpAccept { // === impl HttpConrete === -impl From<(Addr, Profile)> for HttpConcrete { - fn from((dst, Profile { logical, .. }): (Addr, Profile)) -> Self { - Self { dst, logical } - } -} - -impl AsRef for HttpConcrete { - fn as_ref(&self) -> &Addr { - &self.dst +impl From<(Option, Profile)> for HttpConcrete { + fn from((resolve, Profile { logical, .. }): (Option, Profile)) -> Self { + Self { resolve, logical } } } -impl Into for &'_ HttpConcrete { - fn into(self) -> Addr { - self.dst.clone() +/// Produces an address to resolve to individual endpoints. This address is only +/// present if the initial profile resolution was not rejected. +impl Into> for &'_ HttpConcrete { + fn into(self) -> Option { + self.resolve.clone() } } +/// Produces an address to be used if resolution is rejected. impl Into for &'_ HttpConcrete { fn into(self) -> SocketAddr { - self.dst - .socket_addr() + self.resolve + .as_ref() + .and_then(|a| a.socket_addr()) .unwrap_or_else(|| self.logical.orig_dst) } } -impl std::fmt::Display for HttpConcrete { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.dst.fmt(f) - } -} - -impl From for HttpConcrete { - fn from(logical: HttpLogical) -> Self { - Self { - dst: logical.dst.clone(), - logical, - } - } -} - // === impl HttpLogical === -impl std::fmt::Display for HttpLogical { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.dst.fmt(f) - } -} - -impl<'t> From<&'t HttpLogical> for http::header::HeaderValue { - fn from(target: &'t HttpLogical) -> Self { - http::header::HeaderValue::from_str(&target.dst.to_string()) - .expect("addr must be a valid header") - } -} - -impl Into for HttpLogical { - fn into(self) -> SocketAddr { - self.orig_dst - } -} - +/// Produces an address for profile discovery. impl Into for &'_ HttpLogical { fn into(self) -> Addr { self.dst.clone() } } +/// Needed for canonicalization. impl AsRef for HttpLogical { fn as_ref(&self) -> &Addr { &self.dst } } +/// Needed for canonicalization. impl AsMut for HttpLogical { fn as_mut(&mut self) -> &mut Addr { &mut self.dst } } -// === impl HttpEndpoint === - -impl From for HttpEndpoint { - fn from(logical: HttpLogical) -> Self { - Self { - addr: logical.orig_dst, - settings: logical.version.into(), - identity: Conditional::None( - tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into(), - ), - concrete: logical.into(), - metadata: Metadata::default(), - } +// Used to set the l5d-canonical-dst header. +impl<'t> From<&'t HttpLogical> for http::header::HeaderValue { + fn from(target: &'t HttpLogical) -> Self { + http::header::HeaderValue::from_str(&target.dst.to_string()) + .expect("addr must be a valid header") } } +// === impl HttpEndpoint === + impl std::hash::Hash for HttpEndpoint { fn hash(&self, state: &mut H) { self.addr.hash(state); @@ -461,8 +416,8 @@ pub fn route((route, profile): (profiles::http::Route, Profile)) -> dst::Route { // === impl Profile === -impl From<(profiles::Receiver, HttpLogical)> for Profile { - fn from((rx, logical): (profiles::Receiver, HttpLogical)) -> Self { +impl From<(Option, HttpLogical)> for Profile { + fn from((rx, logical): (Option, HttpLogical)) -> Self { Self { rx, logical } } } @@ -473,14 +428,14 @@ impl AsRef for Profile { } } -impl AsRef for Profile { - fn as_ref(&self) -> &profiles::Receiver { - &self.rx +impl Into for &'_ Profile { + fn into(self) -> Addr { + self.logical.dst.clone() } } -impl From for HttpLogical { - fn from(Profile { logical, .. }: Profile) -> Self { - logical +impl Into> for &'_ Profile { + fn into(self) -> Option { + self.rx.clone() } } diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index e047dc1752..77eda9d1e0 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -20,7 +20,7 @@ use linkerd2_app_core::{ spans::SpanConverter, svc::{self}, transport::{self, listen, tls}, - Conditional, Error, ProxyMetrics, StackMetrics, TraceContextLayer, CANONICAL_DST_HEADER, + Addr, Conditional, Error, ProxyMetrics, StackMetrics, TraceContextLayer, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID, }; use std::{collections::HashMap, net, time::Duration}; @@ -295,7 +295,7 @@ impl Config { .push(discover::buffer(1_000, cache_max_idle_age)); // Builds a balancer for each concrete destination. - let concrete = svc::stack(endpoint.clone()) + let concrete = svc::stack(endpoint) .check_new_service::>() .push_on_response( svc::layers() @@ -314,8 +314,11 @@ impl Config { .push_failfast(dispatch_timeout) .push(metrics.stack.layer(stack_labels("concrete"))), ) + .instrument(|c: &HttpConcrete| match c.resolve.as_ref() { + None => info_span!("concrete"), + Some(addr) => info_span!("concrete", %addr), + }) .into_new_service() - .instrument(|c: &HttpConcrete| info_span!("concrete", dst = %c.dst)) .check_new_service::>(); // For each logical target, performs service profile resolution and @@ -328,14 +331,24 @@ impl Config { // When no new requests have been dispatched for `cache_max_idle_age`, // the cached service is dropped. In-flight streams will continue to be // processed. - let logical = concrete + concrete // Uses the split-provided target `Addr` to build a concrete target. + .check_new_service::>() .push_map_target(HttpConcrete::from) .push_on_response(svc::layers().push(svc::layer::mk(svc::SpawnReady::new))) + // The concrete address is only set when the profile could be + // resolved. Endpoint resolution is skipped when there is no + // concrete address. + .check_new_service::<(Option, endpoint::Profile), http::Request<_>>() .push(profiles::split::layer()) + .check_new_service::>() // Drives concrete stacks to readiness and makes the split // cloneable, as required by the retry middleware. - .push_on_response(svc::layers().push_spawn_buffer(buffer_capacity)) + .push_on_response( + svc::layers() + .push_failfast(dispatch_timeout) + .push_spawn_buffer(buffer_capacity), + ) .push(profiles::http::route_request::layer( svc::proxies() .push(metrics.http_route_actual.into_layer::()) @@ -351,29 +364,13 @@ impl Config { .push_map_target(endpoint::route) .into_inner(), )) + .check_new_service::>() .push_map_target(endpoint::Profile::from) // Discovers the service profile from the control plane and passes // it to inner stack to build the router and traffic split. .push(profiles::discover::layer(profiles_client)) - .check_new_service::>(); - - // Caches clients that bypass discovery/balancing. - let forward = svc::stack(endpoint) - .instrument(|t: &HttpEndpoint| debug_span!("forward", peer.id = ?t.identity)) - .check_new_service::>(); - - // Attempts to route route request to a logical services that uses - // control plane for discovery. If the discovery is rejected, the - // `forward` stack is used instead, bypassing load balancing, etc. - logical + .check_new_service::>() .push_on_response(svc::layers().box_http_response()) - .push_fallback_with_predicate( - forward - .push_map_target(HttpEndpoint::from) - .push_on_response(svc::layers().box_http_response().box_http_request()) - .into_inner(), - is_discovery_rejected, - ) .cache( svc::layers().push_on_response( svc::layers() @@ -492,14 +489,8 @@ impl Config { .check_new_service::<(http::Version, endpoint::TcpLogical), http::Request<_>>() .into_inner(); - let tcp_forward = svc::stack(tcp_connect.clone()) - .push_make_thunk() - .push_on_response(svc::layer::mk(tcp::Forward::new)) - .instrument(|_: &TcpEndpoint| debug_span!("forward")) - .check_new::(); - // Load balances TCP streams that cannot be decoded as HTTP. - let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve)) + let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect.clone(), resolve)) .push_on_response( svc::layers() .push_failfast(dispatch_timeout) @@ -522,24 +513,25 @@ impl Config { )) .into_inner(); - svc::stack(svc::stack::MakeSwitch::new( - skip_detect.clone(), - http, - tcp_forward - .push_map_target(TcpEndpoint::from) - .instrument(|_: &_| info_span!("tcp")) - .into_inner(), - )) - .cache( - svc::layers().push_on_response( - svc::layers() - .push_failfast(dispatch_timeout) - .push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age), - ), - ) - .push_map_target(endpoint::TcpLogical::from) - .push(metrics.transport.layer_accept(TransportLabels)) - .into_inner() + let tcp = svc::stack(tcp_connect) + .push_make_thunk() + .push_on_response(svc::layer::mk(tcp::Forward::new)) + .instrument(|_: &TcpEndpoint| debug_span!("tcp.forward")) + .check_new::() + .push_map_target(TcpEndpoint::from) + .into_inner(); + + svc::stack(svc::stack::MakeSwitch::new(skip_detect.clone(), http, tcp)) + .cache( + svc::layers().push_on_response( + svc::layers() + .push_failfast(dispatch_timeout) + .push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age), + ), + ) + .push_map_target(endpoint::TcpLogical::from) + .push(metrics.transport.layer_accept(TransportLabels)) + .into_inner() } } @@ -582,16 +574,6 @@ pub fn trace_labels() -> HashMap { l } -fn is_discovery_rejected(err: &Error) -> bool { - fn is_rejected(err: &(dyn std::error::Error + 'static)) -> bool { - err.is::() || err.source().map(is_rejected).unwrap_or(false) - } - - let rejected = is_rejected(&**err); - tracing::debug!(rejected, %err); - rejected -} - fn is_loop(err: &(dyn std::error::Error + 'static)) -> bool { err.is::() || err.source().map(is_loop).unwrap_or(false) } diff --git a/linkerd/app/src/dst/default_profile.rs b/linkerd/app/src/dst/default_profile.rs new file mode 100644 index 0000000000..f316396458 --- /dev/null +++ b/linkerd/app/src/dst/default_profile.rs @@ -0,0 +1,42 @@ +use super::Rejected; +use futures::prelude::*; +use linkerd2_app_core::{profiles, svc, Error}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tracing::debug; + +pub fn layer() -> impl svc::Layer> + Clone { + svc::layer::mk(RecoverDefaultProfile) +} + +/// Wraps a `GetProfile` to produce no profile when the lookup is rejected. +#[derive(Clone, Debug)] +pub struct RecoverDefaultProfile(S); + +impl tower::Service for RecoverDefaultProfile +where + S: profiles::GetProfile, + S::Future: Send + 'static, +{ + type Response = Option; + type Error = Error; + type Future = + Pin, Error>> + Send + 'static>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, dst: T) -> Self::Future { + Box::pin(self.0.get_profile(dst).or_else(|err| { + if Rejected::matches(&*err) { + debug!("Handling rejected discovery"); + future::ok(None) + } else { + future::err(err) + } + })) + } +} diff --git a/linkerd/app/src/dst/default_resolve.rs b/linkerd/app/src/dst/default_resolve.rs index 61ac3a5acd..37949d1695 100644 --- a/linkerd/app/src/dst/default_resolve.rs +++ b/linkerd/app/src/dst/default_resolve.rs @@ -14,6 +14,8 @@ pub fn layer() -> impl svc::Layer> + Cl svc::layer::mk(RecoverDefaultResolve) } +/// Wraps a `Resolve` to produce a default resolution when the resolution is +/// rejected. #[derive(Clone, Debug)] pub struct RecoverDefaultResolve(S); @@ -24,7 +26,7 @@ where S::Endpoint: Default + Send + 'static, S::Resolution: Send + 'static, S::Future: Send + 'static, - stream::Once, S::Error>>>: + stream::Once, Error>>>: stream::TryStream, Error = S::Error>, { type Response = future::Either< @@ -34,7 +36,7 @@ where type Error = Error; type Future = Pin> + Send + 'static>>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) } diff --git a/linkerd/app/src/dst/mod.rs b/linkerd/app/src/dst/mod.rs index 92b449b36b..a2f1b19a64 100644 --- a/linkerd/app/src/dst/mod.rs +++ b/linkerd/app/src/dst/mod.rs @@ -1,14 +1,16 @@ +mod default_profile; mod default_resolve; mod permit; mod resolve; +use self::default_profile::RecoverDefaultProfile; use self::default_resolve::RecoverDefaultResolve; use indexmap::IndexSet; use linkerd2_app_core::{ control, dns, profiles, proxy::identity, request_filter::RequestFilter, svc, transport::tls, - Addr, ControlHttpMetrics, Error, + ControlHttpMetrics, Error, }; -use permit::PermitConfiguredDsts; +use permit::{PermitProfile, PermitResolve}; use std::time::Duration; use tonic::body::BoxBody; @@ -23,20 +25,26 @@ pub struct Config { pub initial_profile_timeout: Duration, } +/// Indicates that discovery was rejected due to configuration. #[derive(Clone, Debug)] -pub struct Rejected(()); +struct Rejected(()); /// Handles to destination service clients. -/// -/// The addr is preserved for logging. pub struct Dst { + /// The address of the destination service, used for logging. pub addr: control::ControlAddr, - pub profiles: RequestFilter< - PermitConfiguredDsts, - profiles::Client, resolve::BackoffUnlessInvalidArgument>, + + /// Resolves profiles. + pub profiles: RecoverDefaultProfile< + RequestFilter< + PermitProfile, + profiles::Client, resolve::BackoffUnlessInvalidArgument>, + >, >, + + /// Resolves endpoints. pub resolve: RecoverDefaultResolve< - RequestFilter>>, + RequestFilter>>, >, } @@ -51,7 +59,7 @@ impl Config { let backoff = self.control.connect.backoff.clone(); let svc = self.control.build(dns, metrics, identity); let resolve = svc::stack(resolve::new(svc.clone(), &self.context, backoff)) - .push(RequestFilter::layer(PermitConfiguredDsts::new( + .push(RequestFilter::layer(PermitResolve::new( self.get_suffixes, self.get_networks, ))) @@ -64,10 +72,11 @@ impl Config { self.initial_profile_timeout, self.context, )) - .push(RequestFilter::layer( - PermitConfiguredDsts::new(self.profile_suffixes, self.profile_networks) - .with_error::(), - )) + .push(RequestFilter::layer(PermitProfile::new( + self.profile_suffixes, + self.profile_networks, + ))) + .push(default_profile::layer()) .into_inner(); Ok(Dst { @@ -78,21 +87,11 @@ impl Config { } } -impl std::fmt::Display for Rejected { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "rejected discovery") - } -} - -impl std::error::Error for Rejected {} - -impl From for Rejected { - fn from(_: Addr) -> Self { - Rejected(()) - } -} +// === impl Rejected === impl Rejected { + /// Checks whether discovery was rejected, either due to configuration or by + /// the destination service. fn matches(err: &(dyn std::error::Error + 'static)) -> bool { if err.is::() { return true; @@ -105,3 +104,11 @@ impl Rejected { err.source().map(Self::matches).unwrap_or(false) } } + +impl std::fmt::Display for Rejected { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "rejected discovery") + } +} + +impl std::error::Error for Rejected {} diff --git a/linkerd/app/src/dst/permit.rs b/linkerd/app/src/dst/permit.rs index bf05c17249..688c2e7718 100644 --- a/linkerd/app/src/dst/permit.rs +++ b/linkerd/app/src/dst/permit.rs @@ -1,79 +1,100 @@ use super::Rejected; use ipnet::{Contains, IpNet}; use linkerd2_app_core::{dns::Suffix, request_filter::FilterRequest, Addr, Error}; -use std::marker::PhantomData; -use std::net::IpAddr; -use std::sync::Arc; +use std::{net::IpAddr, sync::Arc}; -pub struct PermitConfiguredDsts { - name_suffixes: Arc>, +/// Rejects profile lookups if the destination address is outside of the +/// configured networks/domains. +#[derive(Clone, Debug)] +pub struct PermitProfile(Inner); + +/// Rejects endpoint resolutions if the destinatino address is outside of the +/// configured networks/domains or if there is no resolvable concrete address. +#[derive(Clone, Debug)] +pub struct PermitResolve(Inner); + +#[derive(Clone, Debug)] +struct Inner { + names: Arc>, networks: Arc>, - _error: PhantomData, } -// === impl PermitConfiguredDsts === +// === impl PermitProfile === -impl PermitConfiguredDsts { - pub fn new( +impl PermitProfile { + pub(super) fn new( name_suffixes: impl IntoIterator, nets: impl IntoIterator, ) -> Self { - Self { - name_suffixes: Arc::new(name_suffixes.into_iter().collect()), + Self(Inner { + names: Arc::new(name_suffixes.into_iter().collect()), networks: Arc::new(nets.into_iter().collect()), - _error: PhantomData, - } + }) } +} - /// Configures the returned error type when the target is outside of the - /// configured set of destinations. - pub fn with_error(self) -> PermitConfiguredDsts - where - E: Into + From, - { - PermitConfiguredDsts { - name_suffixes: self.name_suffixes, - networks: self.networks, - _error: PhantomData, +impl FilterRequest for PermitProfile +where + for<'t> &'t T: Into, +{ + type Request = Addr; + + fn filter(&self, t: T) -> Result { + let addr = (&t).into(); + let permitted = self.0.matches(&addr); + tracing::debug!(permitted, "Profile"); + if permitted { + Ok(addr) + } else { + Err(Rejected(()).into()) } } } -impl Clone for PermitConfiguredDsts { - fn clone(&self) -> Self { - Self { - name_suffixes: self.name_suffixes.clone(), - networks: self.networks.clone(), - _error: PhantomData, - } +// === impl PermitResolve === + +impl PermitResolve { + pub(super) fn new( + name_suffixes: impl IntoIterator, + nets: impl IntoIterator, + ) -> Self { + Self(Inner { + names: Arc::new(name_suffixes.into_iter().collect()), + networks: Arc::new(nets.into_iter().collect()), + }) } } -impl FilterRequest for PermitConfiguredDsts +impl FilterRequest for PermitResolve where - E: Into + From, - for<'t> &'t T: Into, + for<'t> &'t T: Into>, { - type Request = T; + type Request = Addr; - fn filter(&self, t: T) -> Result { - let addr = (&t).into(); - let permitted = match addr { - Addr::Name(ref name) => self - .name_suffixes - .iter() - .any(|suffix| suffix.contains(name.name())), + fn filter(&self, t: T) -> Result { + let permitted = (&t).into().and_then(|addr| { + if self.0.matches(&addr) { + Some(addr) + } else { + None + } + }); + tracing::debug!(permitted = %permitted.is_some(), "Resolve"); + permitted.ok_or_else(|| Rejected(()).into()) + } +} + +// === impl Inner === + +impl Inner { + fn matches(&self, addr: &Addr) -> bool { + match addr { + Addr::Name(ref name) => self.names.iter().any(|sfx| sfx.contains(name.name())), Addr::Socket(sa) => self.networks.iter().any(|net| match (net, sa.ip()) { (IpNet::V4(net), IpAddr::V4(addr)) => net.contains(&addr), (IpNet::V6(net), IpAddr::V6(addr)) => net.contains(&addr), _ => false, }), - }; - - if permitted { - Ok(t) - } else { - Err(E::from(addr).into()) } } } diff --git a/linkerd/app/src/dst/resolve.rs b/linkerd/app/src/dst/resolve.rs index 2155dde771..a724a965ee 100644 --- a/linkerd/app/src/dst/resolve.rs +++ b/linkerd/app/src/dst/resolve.rs @@ -1,4 +1,4 @@ -pub use super::permit::PermitConfiguredDsts; +pub use super::permit::PermitResolve; use http_body::Body as HttpBody; use linkerd2_app_core::{ exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}, diff --git a/linkerd/proxy/discover/src/buffer.rs b/linkerd/proxy/discover/src/buffer.rs index 9a26599992..0dcc860912 100644 --- a/linkerd/proxy/discover/src/buffer.rs +++ b/linkerd/proxy/discover/src/buffer.rs @@ -1,7 +1,6 @@ use futures::{ready, Stream, TryFuture}; use linkerd2_error::{Error, Never}; use pin_project::pin_project; -use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -60,7 +59,6 @@ impl Buffer { impl tower::Service for Buffer where - T: fmt::Display, M: tower::Service, D: discover::Discover + Send + 'static, D::Error: Into, diff --git a/linkerd/service-profiles/src/client.rs b/linkerd/service-profiles/src/client.rs index 28e58702a9..c87c0c02d3 100644 --- a/linkerd/service-profiles/src/client.rs +++ b/linkerd/service-profiles/src/client.rs @@ -37,9 +37,6 @@ pub struct Client { context_token: String, } -#[derive(Clone, Debug)] -pub struct InvalidProfileAddr(Addr); - #[pin_project] pub struct ProfileFuture where @@ -126,7 +123,7 @@ where R: Recover + Send + Clone + 'static, R::Backoff: Unpin + Send, { - type Response = Receiver; + type Response = Option; type Error = Error; type Future = ProfileFuture; @@ -142,8 +139,6 @@ where ..Default::default() }; - let timeout = time::delay_for(self.initial_timeout); - let inner = Inner { request, service: self.service.clone(), @@ -152,7 +147,7 @@ where }; ProfileFuture { inner: Some(inner), - timeout, + timeout: time::delay_for(self.initial_timeout), } } } @@ -168,7 +163,7 @@ where R::Backoff: Unpin, R::Backoff: Send, { - type Output = Result; + type Output = Result, Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); @@ -217,7 +212,7 @@ where trace!(?profile, "publishing"); if tx.broadcast(profile).is_err() { trace!("failed to publish profile"); - return + return; } } } @@ -227,7 +222,7 @@ where }; tokio::spawn(daemon.in_current_span()); - Poll::Ready(Ok(rx)) + Poll::Ready(Ok(Some(rx))) } } @@ -526,23 +521,3 @@ mod tests { } } } - -impl InvalidProfileAddr { - pub fn addr(&self) -> &Addr { - &self.0 - } -} - -impl std::fmt::Display for InvalidProfileAddr { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "invalid profile addr: {}", self.0) - } -} - -impl std::error::Error for InvalidProfileAddr {} - -impl From for InvalidProfileAddr { - fn from(addr: Addr) -> Self { - Self(addr) - } -} diff --git a/linkerd/service-profiles/src/discover.rs b/linkerd/service-profiles/src/discover.rs index 11e14bb435..5b5ea73072 100644 --- a/linkerd/service-profiles/src/discover.rs +++ b/linkerd/service-profiles/src/discover.rs @@ -24,7 +24,7 @@ where G: GetProfile, G::Future: Send + 'static, G::Error: Send, - M: NewService<(Receiver, T)> + Clone + Send + 'static, + M: NewService<(Option, T)> + Clone + Send + 'static, { type Service = FutureService< Pin> + Send + 'static>>, diff --git a/linkerd/service-profiles/src/http/route_request.rs b/linkerd/service-profiles/src/http/route_request.rs index a30bb5c388..22372909ad 100644 --- a/linkerd/service-profiles/src/http/route_request.rs +++ b/linkerd/service-profiles/src/http/route_request.rs @@ -33,7 +33,7 @@ pub struct NewRouteRequest { pub struct RouteRequest { target: T, - rx: Receiver, + rx: Option, inner: S, new_route: N, http_routes: Vec<(RequestMatch, Route)>, @@ -54,14 +54,15 @@ impl Clone for NewRouteRequest { impl NewService for NewRouteRequest where - T: AsRef + Clone, + T: Clone, + for<'t> &'t T: Into>, M: NewService, N: NewService<(Route, T)> + Clone, { type Service = RouteRequest; fn new_service(&mut self, target: T) -> Self::Service { - let rx = target.as_ref().clone(); + let rx = (&target).into(); let inner = self.inner.new_service(target.clone()); let default = self .new_route @@ -93,9 +94,12 @@ where fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let mut update = None; - while let Poll::Ready(Some(up)) = self.rx.poll_recv_ref(cx) { - update = Some(up.clone()); + if let Some(rx) = self.rx.as_mut() { + while let Poll::Ready(Some(up)) = rx.poll_recv_ref(cx) { + update = Some(up.clone()); + } } + // Every time the profile updates, rebuild the distribution, reusing // services that existed in the prior state. if let Some(Profile { http_routes, .. }) = update { diff --git a/linkerd/service-profiles/src/lib.rs b/linkerd/service-profiles/src/lib.rs index b58e7342e9..e9c3be244b 100644 --- a/linkerd/service-profiles/src/lib.rs +++ b/linkerd/service-profiles/src/lib.rs @@ -14,7 +14,7 @@ pub mod discover; pub mod http; pub mod split; -pub use self::client::{Client, InvalidProfileAddr}; +pub use self::client::Client; pub type Receiver = tokio::sync::watch::Receiver; @@ -37,7 +37,7 @@ pub struct GetProfileService

(P); /// Watches a destination's Profile. pub trait GetProfile { type Error: Into; - type Future: Future>; + type Future: Future, Self::Error>>; fn get_profile(&mut self, target: T) -> Self::Future; @@ -51,7 +51,7 @@ pub trait GetProfile { impl GetProfile for S where - S: tower::Service + Clone, + S: tower::Service> + Clone, S::Error: Into, { type Error = S::Error; @@ -66,7 +66,7 @@ impl tower::Service for GetProfileService

where P: GetProfile, { - type Response = Receiver; + type Response = Option; type Error = P::Error; type Future = P::Future; diff --git a/linkerd/service-profiles/src/split.rs b/linkerd/service-profiles/src/split.rs index d1687bfa82..87f6acff1c 100644 --- a/linkerd/service-profiles/src/split.rs +++ b/linkerd/service-profiles/src/split.rs @@ -34,20 +34,25 @@ pub struct NewSplit { #[derive(Debug)] pub struct Split { - target: T, - rx: Receiver, - new_service: N, - rng: SmallRng, - inner: Option, - services: ReadyCache, + inner: Inner, } #[derive(Debug)] -struct Inner { - distribution: WeightedIndex, - addrs: IndexSet, +enum Inner { + Default(S), + Split { + rng: SmallRng, + rx: Receiver, + target: T, + new_service: N, + distribution: WeightedIndex, + addrs: IndexSet, + services: ReadyCache, + }, } +// === impl NewSplit === + impl Clone for NewSplit { fn clone(&self) -> Self { Self { @@ -58,31 +63,75 @@ impl Clone for NewSplit { } } -impl NewService for NewSplit +impl NewService for NewSplit where - T: AsRef, + T: Clone, + for<'t> &'t T: Into + Into>, + N: NewService<(Option, T), Service = S> + Clone, S: tower::Service, + S::Error: Into, { type Service = Split; fn new_service(&mut self, target: T) -> Self::Service { - let rx = target.as_ref().clone(); - Split { - rx, - target, - new_service: self.inner.clone(), - rng: self.rng.clone(), - inner: None, - services: ReadyCache::default(), - } + // If there is a profile, it is used to configure one or more inner + // services and a concrete address is provided so that the endpoint + // discovery is performed. + // + // Otherwise, profile lookup was rejected and, therefore, no concrete + // address is provided. + let inner = match Into::>::into(&target) { + None => { + trace!("Building default service"); + Inner::Default(self.inner.new_service((None, target))) + } + Some(rx) => { + let mut targets = rx.borrow().targets.clone(); + if targets.len() == 0 { + targets.push(Target { + addr: (&target).into(), + weight: 1, + }) + } + trace!(?targets, "Building split service"); + + let mut addrs = IndexSet::with_capacity(targets.len()); + let mut weights = Vec::with_capacity(targets.len()); + let mut services = ReadyCache::default(); + let mut new_service = self.inner.clone(); + for Target { weight, addr } in targets.into_iter() { + services.push( + addr.clone(), + new_service.new_service((Some(addr.clone()), target.clone())), + ); + addrs.insert(addr); + weights.push(weight); + } + + Inner::Split { + rx, + target, + new_service, + services, + addrs, + distribution: WeightedIndex::new(weights).unwrap(), + rng: self.rng.clone(), + } + } + }; + + Split { inner } } } +// === impl Split === + impl tower::Service for Split where Req: Send + 'static, - T: AsRef + Clone, - N: NewService<(Addr, T), Service = S> + Clone, + T: Clone, + for<'t> &'t T: Into, + N: NewService<(Option, T), Service = S> + Clone, S: tower::Service + Send + 'static, S::Response: Send + 'static, S::Error: Into, @@ -93,116 +142,90 @@ where type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let mut update = None; - while let Poll::Ready(Some(up)) = self.rx.poll_recv_ref(cx) { - update = Some(up.clone()); - } - // Every time the profile updates, rebuild the distribution, reusing - // services that existed in the prior state. - if let Some(Profile { targets, .. }) = update { - debug!(?targets, "Updating"); - self.update_inner(targets); - } + match self.inner { + Inner::Default(ref mut svc) => svc.poll_ready(cx).map_err(Into::into), + Inner::Split { + ref mut rx, + ref mut services, + ref mut addrs, + ref mut distribution, + ref mut new_service, + ref target, + .. + } => { + let mut update = None; + while let Poll::Ready(Some(up)) = rx.poll_recv_ref(cx) { + update = Some(up.clone()); + } - // If, somehow, the watch hasn't been notified at least once, build the - // default target. This shouldn't actually be exercised, though. - if self.inner.is_none() { - self.update_inner(Vec::new()); - } - debug_assert_ne!(self.services.len(), 0); + // Every time the profile updates, rebuild the distribution, reusing + // services that existed in the prior state. + if let Some(Profile { mut targets, .. }) = update { + if targets.len() == 0 { + targets.push(Target { + addr: target.into(), + weight: 1, + }) + } + debug!(?targets, "Updating"); + + // Replace the old set of addresses with an empty set. The + // prior set is used to determine whether a new service + // needs to be created and what stale services should be + // removed. + let mut prior_addrs = + std::mem::replace(addrs, IndexSet::with_capacity(targets.len())); + let mut weights = Vec::with_capacity(targets.len()); + + // Create an updated distribution and set of services. + for Target { weight, addr } in targets.into_iter() { + // Reuse the prior services whenever possible. + if !prior_addrs.remove(&addr) { + debug!(%addr, "Creating target"); + let svc = new_service.new_service((Some(addr.clone()), target.clone())); + services.push(addr.clone(), svc); + } else { + trace!(%addr, "Target already exists"); + } + addrs.insert(addr); + weights.push(weight); + } + + *distribution = WeightedIndex::new(weights).unwrap(); + + // Remove all prior services that did not exist in the new + // set of targets. + for addr in prior_addrs.into_iter() { + services.evict(&addr); + } + } - // Wait for all target services to be ready. If any services fail, then - // the whole service fails. - Poll::Ready(ready!(self.services.poll_pending(cx)).map_err(Into::into)) + // Wait for all target services to be ready. If any services fail, then + // the whole service fails. + Poll::Ready(ready!(services.poll_pending(cx)).map_err(Into::into)) + } + } } fn call(&mut self, req: Req) -> Self::Future { - let Inner { - ref addrs, - ref distribution, - } = self.inner.as_ref().expect("Called before ready"); - debug_assert_ne!(addrs.len(), 0, "addrs empty"); - debug_assert_eq!(self.services.len(), addrs.len()); - - let idx = if addrs.len() == 1 { - 0 - } else { - distribution.sample(&mut self.rng) - }; - let addr = addrs.get_index(idx).expect("invalid index"); - trace!(%addr, "Dispatching"); - Box::pin(self.services.call_ready(addr, req).err_into::()) - } -} - -impl Split -where - Req: Send + 'static, - T: AsRef + Clone, - N: NewService<(Addr, T), Service = S> + Clone, - S: tower::Service + Send + 'static, - S::Response: Send + 'static, - S::Error: Into, - S::Future: Send, -{ - fn update_inner(&mut self, targets: Vec) { - // Clear out the prior state and preserve its services for reuse. - let mut prior = self.inner.take().map(|i| i.addrs).unwrap_or_default(); - - let mut addrs = IndexSet::with_capacity(targets.len().max(0)); - let mut weights = Vec::with_capacity(targets.len().max(1)); - if targets.len() == 0 { - // If there were no overrides, build a default backend from the - // target. - let addr = self.target.as_ref(); - if !prior.remove(addr) { - debug!(%addr, "Creating default target"); - let svc = self - .new_service - .new_service((addr.clone(), self.target.clone())); - self.services.push(addr.clone(), svc); - } else { - debug!(%addr, "Default target already exists"); - } - addrs.insert(addr.clone()); - weights.push(1); - } else { - // Create an updated distribution and set of services. - for Target { weight, addr } in targets.into_iter() { - // Reuse the prior services whenever possible. - if !prior.remove(&addr) { - debug!(%addr, "Creating target"); - let svc = self - .new_service - .new_service((addr.clone(), self.target.clone())); - self.services.push(addr.clone(), svc); + match self.inner { + Inner::Default(ref mut svc) => Box::pin(svc.call(req).err_into::()), + Inner::Split { + ref addrs, + ref distribution, + ref mut services, + ref mut rng, + .. + } => { + let idx = if addrs.len() == 1 { + 0 } else { - debug!(%addr, "Target already exists"); - } - addrs.insert(addr); - weights.push(weight); + distribution.sample(rng) + }; + let addr = addrs.get_index(idx).expect("invalid index"); + trace!(?addr, "Dispatching"); + Box::pin(services.call_ready(addr, req).err_into::()) } } - - for addr in prior { - self.services.evict(&addr); - } - if !addrs.contains(self.target.as_ref()) { - self.services.evict(self.target.as_ref()); - } - - debug_assert_ne!(addrs.len(), 0, "addrs empty"); - debug_assert_eq!(addrs.len(), weights.len(), "addrs does not match weights"); - // The cache may still contain evicted pending services until the next - // poll. - debug_assert!( - addrs.len() <= self.services.len(), - "addrs does not match the number of services" - ); - let distribution = WeightedIndex::new(weights).expect("Split must be valid"); - self.inner = Some(Inner { - addrs, - distribution, - }); } }