diff --git a/linkerd/app/inbound/src/endpoint.rs b/linkerd/app/inbound/src/endpoint.rs index 8b42717f34..4f469a8266 100644 --- a/linkerd/app/inbound/src/endpoint.rs +++ b/linkerd/app/inbound/src/endpoint.rs @@ -7,7 +7,7 @@ use linkerd2_app_core::{ transport::{listen, tls}, Addr, Conditional, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, }; -use std::{convert::TryInto, fmt, net::SocketAddr, sync::Arc}; +use std::{convert::TryInto, net::SocketAddr, sync::Arc}; use tracing::debug; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -21,7 +21,7 @@ pub struct Target { #[derive(Clone, Debug)] pub struct Logical { target: Target, - profiles: profiles::Receiver, + profiles: Option, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -114,18 +114,13 @@ pub(super) fn route((route, logical): (profiles::http::Route, Logical)) -> dst:: // === impl Target === +/// Used for profile discovery. impl Into for &'_ Target { fn into(self) -> Addr { self.dst.clone() } } -impl AsRef for Target { - fn as_ref(&self) -> &Addr { - &self.dst - } -} - impl tls::HasPeerIdentity for Target { fn peer_identity(&self) -> tls::PeerIdentity { Conditional::None(tls::ReasonForNoPeerName::Loopback.into()) @@ -197,12 +192,6 @@ impl tap::Inspect for Target { } } -impl fmt::Display for Target { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.dst.fmt(f) - } -} - impl stack_tracing::GetSpan<()> for Target { fn get_span(&self, _: &()) -> tracing::Span { use tracing::info_span; @@ -275,14 +264,14 @@ impl From for Target { // === impl Logical === -impl From<(profiles::Receiver, Target)> for Logical { - fn from((profiles, target): (profiles::Receiver, Target)) -> Self { +impl From<(Option, Target)> for Logical { + fn from((profiles, target): (Option, Target)) -> Self { Self { profiles, target } } } -impl AsRef for Logical { - fn as_ref(&self) -> &profiles::Receiver { - &self.profiles +impl Into> for &'_ Logical { + fn into(self) -> Option { + self.profiles.clone() } } diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index fbb0e9f346..e92de2644b 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -248,14 +248,9 @@ impl Config { .push_on_response(svc::layers().box_http_response()) .check_new_service::>(); - let forward = target - .instrument(|_: &Target| debug_span!("forward")) - .check_new_service::>(); - // Attempts to resolve the target as a service profile or, if that // fails, skips that stack to forward to the local endpoint. profile - .push_fallback(forward) .check_new_service::>() // If the traffic is targeted at the inbound port, send it through // the loopback service (i.e. as a gateway). diff --git a/linkerd/app/outbound/src/endpoint.rs b/linkerd/app/outbound/src/endpoint.rs index 07e84a722a..ef4d1297d4 100644 --- a/linkerd/app/outbound/src/endpoint.rs +++ b/linkerd/app/outbound/src/endpoint.rs @@ -1,8 +1,7 @@ use crate::http::uri::Authority; use indexmap::IndexMap; use linkerd2_app_core::{ - dst, - metric_labels, + dst, metric_labels, metric_labels::{prefix_labels, EndpointLabels, TlsStatus}, profiles, proxy::{ @@ -15,8 +14,7 @@ use linkerd2_app_core::{ }, router, transport::{listen, tls}, - Addr, - Conditional, //L5D_REQUIRE_ID, + Addr, Conditional, }; use std::{net::SocketAddr, sync::Arc}; @@ -38,7 +36,7 @@ pub struct HttpLogical { #[derive(Clone, Debug, Eq, PartialEq)] pub struct HttpConcrete { - pub dst: Addr, + pub resolve: Option, pub logical: HttpLogical, } @@ -47,7 +45,7 @@ pub struct LogicalPerRequest(HttpAccept); #[derive(Clone, Debug)] pub struct Profile { - pub rx: profiles::Receiver, + pub rx: Option, pub logical: HttpLogical, } @@ -81,21 +79,17 @@ impl From for TcpLogical { } } +/// Used as a default destination when resolution is rejected. impl Into for &'_ TcpLogical { fn into(self) -> SocketAddr { self.addr } } -impl Into for &'_ TcpLogical { - fn into(self) -> Addr { - self.addr.into() - } -} - -impl std::fmt::Display for TcpLogical { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.addr.fmt(f) +/// Used to resolve endpoints. +impl Into> for &'_ TcpLogical { + fn into(self) -> Option { + Some(self.addr.into()) } } @@ -118,102 +112,63 @@ impl Into for &'_ HttpAccept { // === impl HttpConrete === -impl From<(Addr, Profile)> for HttpConcrete { - fn from((dst, Profile { logical, .. }): (Addr, Profile)) -> Self { - Self { dst, logical } - } -} - -impl AsRef for HttpConcrete { - fn as_ref(&self) -> &Addr { - &self.dst +impl From<(Option, Profile)> for HttpConcrete { + fn from((resolve, Profile { logical, .. }): (Option, Profile)) -> Self { + Self { resolve, logical } } } -impl Into for &'_ HttpConcrete { - fn into(self) -> Addr { - self.dst.clone() +/// Produces an address to resolve to individual endpoints. This address is only +/// present if the initial profile resolution was not rejected. +impl Into> for &'_ HttpConcrete { + fn into(self) -> Option { + self.resolve.clone() } } +/// Produces an address to be used if resolution is rejected. impl Into for &'_ HttpConcrete { fn into(self) -> SocketAddr { - self.dst - .socket_addr() + self.resolve + .as_ref() + .and_then(|a| a.socket_addr()) .unwrap_or_else(|| self.logical.orig_dst) } } -impl std::fmt::Display for HttpConcrete { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.dst.fmt(f) - } -} - -impl From for HttpConcrete { - fn from(logical: HttpLogical) -> Self { - Self { - dst: logical.dst.clone(), - logical, - } - } -} - // === impl HttpLogical === -impl std::fmt::Display for HttpLogical { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.dst.fmt(f) - } -} - -impl<'t> From<&'t HttpLogical> for http::header::HeaderValue { - fn from(target: &'t HttpLogical) -> Self { - http::header::HeaderValue::from_str(&target.dst.to_string()) - .expect("addr must be a valid header") - } -} - -impl Into for HttpLogical { - fn into(self) -> SocketAddr { - self.orig_dst - } -} - +/// Produces an address for profile discovery. impl Into for &'_ HttpLogical { fn into(self) -> Addr { self.dst.clone() } } +/// Needed for canonicalization. impl AsRef for HttpLogical { fn as_ref(&self) -> &Addr { &self.dst } } +/// Needed for canonicalization. impl AsMut for HttpLogical { fn as_mut(&mut self) -> &mut Addr { &mut self.dst } } -// === impl HttpEndpoint === - -impl From for HttpEndpoint { - fn from(logical: HttpLogical) -> Self { - Self { - addr: logical.orig_dst, - settings: logical.version.into(), - identity: Conditional::None( - tls::ReasonForNoPeerName::NotProvidedByServiceDiscovery.into(), - ), - concrete: logical.into(), - metadata: Metadata::default(), - } +// Used to set the l5d-canonical-dst header. +impl<'t> From<&'t HttpLogical> for http::header::HeaderValue { + fn from(target: &'t HttpLogical) -> Self { + http::header::HeaderValue::from_str(&target.dst.to_string()) + .expect("addr must be a valid header") } } +// === impl HttpEndpoint === + impl std::hash::Hash for HttpEndpoint { fn hash(&self, state: &mut H) { self.addr.hash(state); @@ -461,8 +416,8 @@ pub fn route((route, profile): (profiles::http::Route, Profile)) -> dst::Route { // === impl Profile === -impl From<(profiles::Receiver, HttpLogical)> for Profile { - fn from((rx, logical): (profiles::Receiver, HttpLogical)) -> Self { +impl From<(Option, HttpLogical)> for Profile { + fn from((rx, logical): (Option, HttpLogical)) -> Self { Self { rx, logical } } } @@ -473,14 +428,14 @@ impl AsRef for Profile { } } -impl AsRef for Profile { - fn as_ref(&self) -> &profiles::Receiver { - &self.rx +impl Into for &'_ Profile { + fn into(self) -> Addr { + self.logical.dst.clone() } } -impl From for HttpLogical { - fn from(Profile { logical, .. }: Profile) -> Self { - logical +impl Into> for &'_ Profile { + fn into(self) -> Option { + self.rx.clone() } } diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index e047dc1752..77eda9d1e0 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -20,7 +20,7 @@ use linkerd2_app_core::{ spans::SpanConverter, svc::{self}, transport::{self, listen, tls}, - Conditional, Error, ProxyMetrics, StackMetrics, TraceContextLayer, CANONICAL_DST_HEADER, + Addr, Conditional, Error, ProxyMetrics, StackMetrics, TraceContextLayer, CANONICAL_DST_HEADER, DST_OVERRIDE_HEADER, L5D_REQUIRE_ID, }; use std::{collections::HashMap, net, time::Duration}; @@ -295,7 +295,7 @@ impl Config { .push(discover::buffer(1_000, cache_max_idle_age)); // Builds a balancer for each concrete destination. - let concrete = svc::stack(endpoint.clone()) + let concrete = svc::stack(endpoint) .check_new_service::>() .push_on_response( svc::layers() @@ -314,8 +314,11 @@ impl Config { .push_failfast(dispatch_timeout) .push(metrics.stack.layer(stack_labels("concrete"))), ) + .instrument(|c: &HttpConcrete| match c.resolve.as_ref() { + None => info_span!("concrete"), + Some(addr) => info_span!("concrete", %addr), + }) .into_new_service() - .instrument(|c: &HttpConcrete| info_span!("concrete", dst = %c.dst)) .check_new_service::>(); // For each logical target, performs service profile resolution and @@ -328,14 +331,24 @@ impl Config { // When no new requests have been dispatched for `cache_max_idle_age`, // the cached service is dropped. In-flight streams will continue to be // processed. - let logical = concrete + concrete // Uses the split-provided target `Addr` to build a concrete target. + .check_new_service::>() .push_map_target(HttpConcrete::from) .push_on_response(svc::layers().push(svc::layer::mk(svc::SpawnReady::new))) + // The concrete address is only set when the profile could be + // resolved. Endpoint resolution is skipped when there is no + // concrete address. + .check_new_service::<(Option, endpoint::Profile), http::Request<_>>() .push(profiles::split::layer()) + .check_new_service::>() // Drives concrete stacks to readiness and makes the split // cloneable, as required by the retry middleware. - .push_on_response(svc::layers().push_spawn_buffer(buffer_capacity)) + .push_on_response( + svc::layers() + .push_failfast(dispatch_timeout) + .push_spawn_buffer(buffer_capacity), + ) .push(profiles::http::route_request::layer( svc::proxies() .push(metrics.http_route_actual.into_layer::()) @@ -351,29 +364,13 @@ impl Config { .push_map_target(endpoint::route) .into_inner(), )) + .check_new_service::>() .push_map_target(endpoint::Profile::from) // Discovers the service profile from the control plane and passes // it to inner stack to build the router and traffic split. .push(profiles::discover::layer(profiles_client)) - .check_new_service::>(); - - // Caches clients that bypass discovery/balancing. - let forward = svc::stack(endpoint) - .instrument(|t: &HttpEndpoint| debug_span!("forward", peer.id = ?t.identity)) - .check_new_service::>(); - - // Attempts to route route request to a logical services that uses - // control plane for discovery. If the discovery is rejected, the - // `forward` stack is used instead, bypassing load balancing, etc. - logical + .check_new_service::>() .push_on_response(svc::layers().box_http_response()) - .push_fallback_with_predicate( - forward - .push_map_target(HttpEndpoint::from) - .push_on_response(svc::layers().box_http_response().box_http_request()) - .into_inner(), - is_discovery_rejected, - ) .cache( svc::layers().push_on_response( svc::layers() @@ -492,14 +489,8 @@ impl Config { .check_new_service::<(http::Version, endpoint::TcpLogical), http::Request<_>>() .into_inner(); - let tcp_forward = svc::stack(tcp_connect.clone()) - .push_make_thunk() - .push_on_response(svc::layer::mk(tcp::Forward::new)) - .instrument(|_: &TcpEndpoint| debug_span!("forward")) - .check_new::(); - // Load balances TCP streams that cannot be decoded as HTTP. - let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect, resolve)) + let tcp_balance = svc::stack(self.build_tcp_balance(tcp_connect.clone(), resolve)) .push_on_response( svc::layers() .push_failfast(dispatch_timeout) @@ -522,24 +513,25 @@ impl Config { )) .into_inner(); - svc::stack(svc::stack::MakeSwitch::new( - skip_detect.clone(), - http, - tcp_forward - .push_map_target(TcpEndpoint::from) - .instrument(|_: &_| info_span!("tcp")) - .into_inner(), - )) - .cache( - svc::layers().push_on_response( - svc::layers() - .push_failfast(dispatch_timeout) - .push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age), - ), - ) - .push_map_target(endpoint::TcpLogical::from) - .push(metrics.transport.layer_accept(TransportLabels)) - .into_inner() + let tcp = svc::stack(tcp_connect) + .push_make_thunk() + .push_on_response(svc::layer::mk(tcp::Forward::new)) + .instrument(|_: &TcpEndpoint| debug_span!("tcp.forward")) + .check_new::() + .push_map_target(TcpEndpoint::from) + .into_inner(); + + svc::stack(svc::stack::MakeSwitch::new(skip_detect.clone(), http, tcp)) + .cache( + svc::layers().push_on_response( + svc::layers() + .push_failfast(dispatch_timeout) + .push_spawn_buffer_with_idle_timeout(buffer_capacity, cache_max_idle_age), + ), + ) + .push_map_target(endpoint::TcpLogical::from) + .push(metrics.transport.layer_accept(TransportLabels)) + .into_inner() } } @@ -582,16 +574,6 @@ pub fn trace_labels() -> HashMap { l } -fn is_discovery_rejected(err: &Error) -> bool { - fn is_rejected(err: &(dyn std::error::Error + 'static)) -> bool { - err.is::() || err.source().map(is_rejected).unwrap_or(false) - } - - let rejected = is_rejected(&**err); - tracing::debug!(rejected, %err); - rejected -} - fn is_loop(err: &(dyn std::error::Error + 'static)) -> bool { err.is::() || err.source().map(is_loop).unwrap_or(false) } diff --git a/linkerd/app/src/dst/default_profile.rs b/linkerd/app/src/dst/default_profile.rs new file mode 100644 index 0000000000..f316396458 --- /dev/null +++ b/linkerd/app/src/dst/default_profile.rs @@ -0,0 +1,42 @@ +use super::Rejected; +use futures::prelude::*; +use linkerd2_app_core::{profiles, svc, Error}; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tracing::debug; + +pub fn layer() -> impl svc::Layer> + Clone { + svc::layer::mk(RecoverDefaultProfile) +} + +/// Wraps a `GetProfile` to produce no profile when the lookup is rejected. +#[derive(Clone, Debug)] +pub struct RecoverDefaultProfile(S); + +impl tower::Service for RecoverDefaultProfile +where + S: profiles::GetProfile, + S::Future: Send + 'static, +{ + type Response = Option; + type Error = Error; + type Future = + Pin, Error>> + Send + 'static>>; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, dst: T) -> Self::Future { + Box::pin(self.0.get_profile(dst).or_else(|err| { + if Rejected::matches(&*err) { + debug!("Handling rejected discovery"); + future::ok(None) + } else { + future::err(err) + } + })) + } +} diff --git a/linkerd/app/src/dst/default_resolve.rs b/linkerd/app/src/dst/default_resolve.rs index 61ac3a5acd..37949d1695 100644 --- a/linkerd/app/src/dst/default_resolve.rs +++ b/linkerd/app/src/dst/default_resolve.rs @@ -14,6 +14,8 @@ pub fn layer() -> impl svc::Layer> + Cl svc::layer::mk(RecoverDefaultResolve) } +/// Wraps a `Resolve` to produce a default resolution when the resolution is +/// rejected. #[derive(Clone, Debug)] pub struct RecoverDefaultResolve(S); @@ -24,7 +26,7 @@ where S::Endpoint: Default + Send + 'static, S::Resolution: Send + 'static, S::Future: Send + 'static, - stream::Once, S::Error>>>: + stream::Once, Error>>>: stream::TryStream, Error = S::Error>, { type Response = future::Either< @@ -34,7 +36,7 @@ where type Error = Error; type Future = Pin> + Send + 'static>>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.0.poll_ready(cx) } diff --git a/linkerd/app/src/dst/mod.rs b/linkerd/app/src/dst/mod.rs index 92b449b36b..a2f1b19a64 100644 --- a/linkerd/app/src/dst/mod.rs +++ b/linkerd/app/src/dst/mod.rs @@ -1,14 +1,16 @@ +mod default_profile; mod default_resolve; mod permit; mod resolve; +use self::default_profile::RecoverDefaultProfile; use self::default_resolve::RecoverDefaultResolve; use indexmap::IndexSet; use linkerd2_app_core::{ control, dns, profiles, proxy::identity, request_filter::RequestFilter, svc, transport::tls, - Addr, ControlHttpMetrics, Error, + ControlHttpMetrics, Error, }; -use permit::PermitConfiguredDsts; +use permit::{PermitProfile, PermitResolve}; use std::time::Duration; use tonic::body::BoxBody; @@ -23,20 +25,26 @@ pub struct Config { pub initial_profile_timeout: Duration, } +/// Indicates that discovery was rejected due to configuration. #[derive(Clone, Debug)] -pub struct Rejected(()); +struct Rejected(()); /// Handles to destination service clients. -/// -/// The addr is preserved for logging. pub struct Dst { + /// The address of the destination service, used for logging. pub addr: control::ControlAddr, - pub profiles: RequestFilter< - PermitConfiguredDsts, - profiles::Client, resolve::BackoffUnlessInvalidArgument>, + + /// Resolves profiles. + pub profiles: RecoverDefaultProfile< + RequestFilter< + PermitProfile, + profiles::Client, resolve::BackoffUnlessInvalidArgument>, + >, >, + + /// Resolves endpoints. pub resolve: RecoverDefaultResolve< - RequestFilter>>, + RequestFilter>>, >, } @@ -51,7 +59,7 @@ impl Config { let backoff = self.control.connect.backoff.clone(); let svc = self.control.build(dns, metrics, identity); let resolve = svc::stack(resolve::new(svc.clone(), &self.context, backoff)) - .push(RequestFilter::layer(PermitConfiguredDsts::new( + .push(RequestFilter::layer(PermitResolve::new( self.get_suffixes, self.get_networks, ))) @@ -64,10 +72,11 @@ impl Config { self.initial_profile_timeout, self.context, )) - .push(RequestFilter::layer( - PermitConfiguredDsts::new(self.profile_suffixes, self.profile_networks) - .with_error::(), - )) + .push(RequestFilter::layer(PermitProfile::new( + self.profile_suffixes, + self.profile_networks, + ))) + .push(default_profile::layer()) .into_inner(); Ok(Dst { @@ -78,21 +87,11 @@ impl Config { } } -impl std::fmt::Display for Rejected { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "rejected discovery") - } -} - -impl std::error::Error for Rejected {} - -impl From for Rejected { - fn from(_: Addr) -> Self { - Rejected(()) - } -} +// === impl Rejected === impl Rejected { + /// Checks whether discovery was rejected, either due to configuration or by + /// the destination service. fn matches(err: &(dyn std::error::Error + 'static)) -> bool { if err.is::() { return true; @@ -105,3 +104,11 @@ impl Rejected { err.source().map(Self::matches).unwrap_or(false) } } + +impl std::fmt::Display for Rejected { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "rejected discovery") + } +} + +impl std::error::Error for Rejected {} diff --git a/linkerd/app/src/dst/permit.rs b/linkerd/app/src/dst/permit.rs index bf05c17249..688c2e7718 100644 --- a/linkerd/app/src/dst/permit.rs +++ b/linkerd/app/src/dst/permit.rs @@ -1,79 +1,100 @@ use super::Rejected; use ipnet::{Contains, IpNet}; use linkerd2_app_core::{dns::Suffix, request_filter::FilterRequest, Addr, Error}; -use std::marker::PhantomData; -use std::net::IpAddr; -use std::sync::Arc; +use std::{net::IpAddr, sync::Arc}; -pub struct PermitConfiguredDsts { - name_suffixes: Arc>, +/// Rejects profile lookups if the destination address is outside of the +/// configured networks/domains. +#[derive(Clone, Debug)] +pub struct PermitProfile(Inner); + +/// Rejects endpoint resolutions if the destinatino address is outside of the +/// configured networks/domains or if there is no resolvable concrete address. +#[derive(Clone, Debug)] +pub struct PermitResolve(Inner); + +#[derive(Clone, Debug)] +struct Inner { + names: Arc>, networks: Arc>, - _error: PhantomData, } -// === impl PermitConfiguredDsts === +// === impl PermitProfile === -impl PermitConfiguredDsts { - pub fn new( +impl PermitProfile { + pub(super) fn new( name_suffixes: impl IntoIterator, nets: impl IntoIterator, ) -> Self { - Self { - name_suffixes: Arc::new(name_suffixes.into_iter().collect()), + Self(Inner { + names: Arc::new(name_suffixes.into_iter().collect()), networks: Arc::new(nets.into_iter().collect()), - _error: PhantomData, - } + }) } +} - /// Configures the returned error type when the target is outside of the - /// configured set of destinations. - pub fn with_error(self) -> PermitConfiguredDsts - where - E: Into + From, - { - PermitConfiguredDsts { - name_suffixes: self.name_suffixes, - networks: self.networks, - _error: PhantomData, +impl FilterRequest for PermitProfile +where + for<'t> &'t T: Into, +{ + type Request = Addr; + + fn filter(&self, t: T) -> Result { + let addr = (&t).into(); + let permitted = self.0.matches(&addr); + tracing::debug!(permitted, "Profile"); + if permitted { + Ok(addr) + } else { + Err(Rejected(()).into()) } } } -impl Clone for PermitConfiguredDsts { - fn clone(&self) -> Self { - Self { - name_suffixes: self.name_suffixes.clone(), - networks: self.networks.clone(), - _error: PhantomData, - } +// === impl PermitResolve === + +impl PermitResolve { + pub(super) fn new( + name_suffixes: impl IntoIterator, + nets: impl IntoIterator, + ) -> Self { + Self(Inner { + names: Arc::new(name_suffixes.into_iter().collect()), + networks: Arc::new(nets.into_iter().collect()), + }) } } -impl FilterRequest for PermitConfiguredDsts +impl FilterRequest for PermitResolve where - E: Into + From, - for<'t> &'t T: Into, + for<'t> &'t T: Into>, { - type Request = T; + type Request = Addr; - fn filter(&self, t: T) -> Result { - let addr = (&t).into(); - let permitted = match addr { - Addr::Name(ref name) => self - .name_suffixes - .iter() - .any(|suffix| suffix.contains(name.name())), + fn filter(&self, t: T) -> Result { + let permitted = (&t).into().and_then(|addr| { + if self.0.matches(&addr) { + Some(addr) + } else { + None + } + }); + tracing::debug!(permitted = %permitted.is_some(), "Resolve"); + permitted.ok_or_else(|| Rejected(()).into()) + } +} + +// === impl Inner === + +impl Inner { + fn matches(&self, addr: &Addr) -> bool { + match addr { + Addr::Name(ref name) => self.names.iter().any(|sfx| sfx.contains(name.name())), Addr::Socket(sa) => self.networks.iter().any(|net| match (net, sa.ip()) { (IpNet::V4(net), IpAddr::V4(addr)) => net.contains(&addr), (IpNet::V6(net), IpAddr::V6(addr)) => net.contains(&addr), _ => false, }), - }; - - if permitted { - Ok(t) - } else { - Err(E::from(addr).into()) } } } diff --git a/linkerd/app/src/dst/resolve.rs b/linkerd/app/src/dst/resolve.rs index 2155dde771..a724a965ee 100644 --- a/linkerd/app/src/dst/resolve.rs +++ b/linkerd/app/src/dst/resolve.rs @@ -1,4 +1,4 @@ -pub use super::permit::PermitConfiguredDsts; +pub use super::permit::PermitResolve; use http_body::Body as HttpBody; use linkerd2_app_core::{ exp_backoff::{ExponentialBackoff, ExponentialBackoffStream}, diff --git a/linkerd/proxy/discover/src/buffer.rs b/linkerd/proxy/discover/src/buffer.rs index 9a26599992..0dcc860912 100644 --- a/linkerd/proxy/discover/src/buffer.rs +++ b/linkerd/proxy/discover/src/buffer.rs @@ -1,7 +1,6 @@ use futures::{ready, Stream, TryFuture}; use linkerd2_error::{Error, Never}; use pin_project::pin_project; -use std::fmt; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; @@ -60,7 +59,6 @@ impl Buffer { impl tower::Service for Buffer where - T: fmt::Display, M: tower::Service, D: discover::Discover + Send + 'static, D::Error: Into, diff --git a/linkerd/service-profiles/src/client.rs b/linkerd/service-profiles/src/client.rs index 28e58702a9..c87c0c02d3 100644 --- a/linkerd/service-profiles/src/client.rs +++ b/linkerd/service-profiles/src/client.rs @@ -37,9 +37,6 @@ pub struct Client { context_token: String, } -#[derive(Clone, Debug)] -pub struct InvalidProfileAddr(Addr); - #[pin_project] pub struct ProfileFuture where @@ -126,7 +123,7 @@ where R: Recover + Send + Clone + 'static, R::Backoff: Unpin + Send, { - type Response = Receiver; + type Response = Option; type Error = Error; type Future = ProfileFuture; @@ -142,8 +139,6 @@ where ..Default::default() }; - let timeout = time::delay_for(self.initial_timeout); - let inner = Inner { request, service: self.service.clone(), @@ -152,7 +147,7 @@ where }; ProfileFuture { inner: Some(inner), - timeout, + timeout: time::delay_for(self.initial_timeout), } } } @@ -168,7 +163,7 @@ where R::Backoff: Unpin, R::Backoff: Send, { - type Output = Result; + type Output = Result, Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); @@ -217,7 +212,7 @@ where trace!(?profile, "publishing"); if tx.broadcast(profile).is_err() { trace!("failed to publish profile"); - return + return; } } } @@ -227,7 +222,7 @@ where }; tokio::spawn(daemon.in_current_span()); - Poll::Ready(Ok(rx)) + Poll::Ready(Ok(Some(rx))) } } @@ -526,23 +521,3 @@ mod tests { } } } - -impl InvalidProfileAddr { - pub fn addr(&self) -> &Addr { - &self.0 - } -} - -impl std::fmt::Display for InvalidProfileAddr { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "invalid profile addr: {}", self.0) - } -} - -impl std::error::Error for InvalidProfileAddr {} - -impl From for InvalidProfileAddr { - fn from(addr: Addr) -> Self { - Self(addr) - } -} diff --git a/linkerd/service-profiles/src/discover.rs b/linkerd/service-profiles/src/discover.rs index 11e14bb435..5b5ea73072 100644 --- a/linkerd/service-profiles/src/discover.rs +++ b/linkerd/service-profiles/src/discover.rs @@ -24,7 +24,7 @@ where G: GetProfile, G::Future: Send + 'static, G::Error: Send, - M: NewService<(Receiver, T)> + Clone + Send + 'static, + M: NewService<(Option, T)> + Clone + Send + 'static, { type Service = FutureService< Pin> + Send + 'static>>, diff --git a/linkerd/service-profiles/src/http/route_request.rs b/linkerd/service-profiles/src/http/route_request.rs index a30bb5c388..22372909ad 100644 --- a/linkerd/service-profiles/src/http/route_request.rs +++ b/linkerd/service-profiles/src/http/route_request.rs @@ -33,7 +33,7 @@ pub struct NewRouteRequest { pub struct RouteRequest { target: T, - rx: Receiver, + rx: Option, inner: S, new_route: N, http_routes: Vec<(RequestMatch, Route)>, @@ -54,14 +54,15 @@ impl Clone for NewRouteRequest { impl NewService for NewRouteRequest where - T: AsRef + Clone, + T: Clone, + for<'t> &'t T: Into>, M: NewService, N: NewService<(Route, T)> + Clone, { type Service = RouteRequest; fn new_service(&mut self, target: T) -> Self::Service { - let rx = target.as_ref().clone(); + let rx = (&target).into(); let inner = self.inner.new_service(target.clone()); let default = self .new_route @@ -93,9 +94,12 @@ where fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { let mut update = None; - while let Poll::Ready(Some(up)) = self.rx.poll_recv_ref(cx) { - update = Some(up.clone()); + if let Some(rx) = self.rx.as_mut() { + while let Poll::Ready(Some(up)) = rx.poll_recv_ref(cx) { + update = Some(up.clone()); + } } + // Every time the profile updates, rebuild the distribution, reusing // services that existed in the prior state. if let Some(Profile { http_routes, .. }) = update { diff --git a/linkerd/service-profiles/src/lib.rs b/linkerd/service-profiles/src/lib.rs index b58e7342e9..e9c3be244b 100644 --- a/linkerd/service-profiles/src/lib.rs +++ b/linkerd/service-profiles/src/lib.rs @@ -14,7 +14,7 @@ pub mod discover; pub mod http; pub mod split; -pub use self::client::{Client, InvalidProfileAddr}; +pub use self::client::Client; pub type Receiver = tokio::sync::watch::Receiver; @@ -37,7 +37,7 @@ pub struct GetProfileService

(P); /// Watches a destination's Profile. pub trait GetProfile { type Error: Into; - type Future: Future>; + type Future: Future, Self::Error>>; fn get_profile(&mut self, target: T) -> Self::Future; @@ -51,7 +51,7 @@ pub trait GetProfile { impl GetProfile for S where - S: tower::Service + Clone, + S: tower::Service> + Clone, S::Error: Into, { type Error = S::Error; @@ -66,7 +66,7 @@ impl tower::Service for GetProfileService

where P: GetProfile, { - type Response = Receiver; + type Response = Option; type Error = P::Error; type Future = P::Future; diff --git a/linkerd/service-profiles/src/split.rs b/linkerd/service-profiles/src/split.rs index d1687bfa82..87f6acff1c 100644 --- a/linkerd/service-profiles/src/split.rs +++ b/linkerd/service-profiles/src/split.rs @@ -34,20 +34,25 @@ pub struct NewSplit { #[derive(Debug)] pub struct Split { - target: T, - rx: Receiver, - new_service: N, - rng: SmallRng, - inner: Option, - services: ReadyCache, + inner: Inner, } #[derive(Debug)] -struct Inner { - distribution: WeightedIndex, - addrs: IndexSet, +enum Inner { + Default(S), + Split { + rng: SmallRng, + rx: Receiver, + target: T, + new_service: N, + distribution: WeightedIndex, + addrs: IndexSet, + services: ReadyCache, + }, } +// === impl NewSplit === + impl Clone for NewSplit { fn clone(&self) -> Self { Self { @@ -58,31 +63,75 @@ impl Clone for NewSplit { } } -impl NewService for NewSplit +impl NewService for NewSplit where - T: AsRef, + T: Clone, + for<'t> &'t T: Into + Into>, + N: NewService<(Option, T), Service = S> + Clone, S: tower::Service, + S::Error: Into, { type Service = Split; fn new_service(&mut self, target: T) -> Self::Service { - let rx = target.as_ref().clone(); - Split { - rx, - target, - new_service: self.inner.clone(), - rng: self.rng.clone(), - inner: None, - services: ReadyCache::default(), - } + // If there is a profile, it is used to configure one or more inner + // services and a concrete address is provided so that the endpoint + // discovery is performed. + // + // Otherwise, profile lookup was rejected and, therefore, no concrete + // address is provided. + let inner = match Into::>::into(&target) { + None => { + trace!("Building default service"); + Inner::Default(self.inner.new_service((None, target))) + } + Some(rx) => { + let mut targets = rx.borrow().targets.clone(); + if targets.len() == 0 { + targets.push(Target { + addr: (&target).into(), + weight: 1, + }) + } + trace!(?targets, "Building split service"); + + let mut addrs = IndexSet::with_capacity(targets.len()); + let mut weights = Vec::with_capacity(targets.len()); + let mut services = ReadyCache::default(); + let mut new_service = self.inner.clone(); + for Target { weight, addr } in targets.into_iter() { + services.push( + addr.clone(), + new_service.new_service((Some(addr.clone()), target.clone())), + ); + addrs.insert(addr); + weights.push(weight); + } + + Inner::Split { + rx, + target, + new_service, + services, + addrs, + distribution: WeightedIndex::new(weights).unwrap(), + rng: self.rng.clone(), + } + } + }; + + Split { inner } } } +// === impl Split === + impl tower::Service for Split where Req: Send + 'static, - T: AsRef + Clone, - N: NewService<(Addr, T), Service = S> + Clone, + T: Clone, + for<'t> &'t T: Into, + N: NewService<(Option, T), Service = S> + Clone, S: tower::Service + Send + 'static, S::Response: Send + 'static, S::Error: Into, @@ -93,116 +142,90 @@ where type Future = Pin> + Send + 'static>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - let mut update = None; - while let Poll::Ready(Some(up)) = self.rx.poll_recv_ref(cx) { - update = Some(up.clone()); - } - // Every time the profile updates, rebuild the distribution, reusing - // services that existed in the prior state. - if let Some(Profile { targets, .. }) = update { - debug!(?targets, "Updating"); - self.update_inner(targets); - } + match self.inner { + Inner::Default(ref mut svc) => svc.poll_ready(cx).map_err(Into::into), + Inner::Split { + ref mut rx, + ref mut services, + ref mut addrs, + ref mut distribution, + ref mut new_service, + ref target, + .. + } => { + let mut update = None; + while let Poll::Ready(Some(up)) = rx.poll_recv_ref(cx) { + update = Some(up.clone()); + } - // If, somehow, the watch hasn't been notified at least once, build the - // default target. This shouldn't actually be exercised, though. - if self.inner.is_none() { - self.update_inner(Vec::new()); - } - debug_assert_ne!(self.services.len(), 0); + // Every time the profile updates, rebuild the distribution, reusing + // services that existed in the prior state. + if let Some(Profile { mut targets, .. }) = update { + if targets.len() == 0 { + targets.push(Target { + addr: target.into(), + weight: 1, + }) + } + debug!(?targets, "Updating"); + + // Replace the old set of addresses with an empty set. The + // prior set is used to determine whether a new service + // needs to be created and what stale services should be + // removed. + let mut prior_addrs = + std::mem::replace(addrs, IndexSet::with_capacity(targets.len())); + let mut weights = Vec::with_capacity(targets.len()); + + // Create an updated distribution and set of services. + for Target { weight, addr } in targets.into_iter() { + // Reuse the prior services whenever possible. + if !prior_addrs.remove(&addr) { + debug!(%addr, "Creating target"); + let svc = new_service.new_service((Some(addr.clone()), target.clone())); + services.push(addr.clone(), svc); + } else { + trace!(%addr, "Target already exists"); + } + addrs.insert(addr); + weights.push(weight); + } + + *distribution = WeightedIndex::new(weights).unwrap(); + + // Remove all prior services that did not exist in the new + // set of targets. + for addr in prior_addrs.into_iter() { + services.evict(&addr); + } + } - // Wait for all target services to be ready. If any services fail, then - // the whole service fails. - Poll::Ready(ready!(self.services.poll_pending(cx)).map_err(Into::into)) + // Wait for all target services to be ready. If any services fail, then + // the whole service fails. + Poll::Ready(ready!(services.poll_pending(cx)).map_err(Into::into)) + } + } } fn call(&mut self, req: Req) -> Self::Future { - let Inner { - ref addrs, - ref distribution, - } = self.inner.as_ref().expect("Called before ready"); - debug_assert_ne!(addrs.len(), 0, "addrs empty"); - debug_assert_eq!(self.services.len(), addrs.len()); - - let idx = if addrs.len() == 1 { - 0 - } else { - distribution.sample(&mut self.rng) - }; - let addr = addrs.get_index(idx).expect("invalid index"); - trace!(%addr, "Dispatching"); - Box::pin(self.services.call_ready(addr, req).err_into::()) - } -} - -impl Split -where - Req: Send + 'static, - T: AsRef + Clone, - N: NewService<(Addr, T), Service = S> + Clone, - S: tower::Service + Send + 'static, - S::Response: Send + 'static, - S::Error: Into, - S::Future: Send, -{ - fn update_inner(&mut self, targets: Vec) { - // Clear out the prior state and preserve its services for reuse. - let mut prior = self.inner.take().map(|i| i.addrs).unwrap_or_default(); - - let mut addrs = IndexSet::with_capacity(targets.len().max(0)); - let mut weights = Vec::with_capacity(targets.len().max(1)); - if targets.len() == 0 { - // If there were no overrides, build a default backend from the - // target. - let addr = self.target.as_ref(); - if !prior.remove(addr) { - debug!(%addr, "Creating default target"); - let svc = self - .new_service - .new_service((addr.clone(), self.target.clone())); - self.services.push(addr.clone(), svc); - } else { - debug!(%addr, "Default target already exists"); - } - addrs.insert(addr.clone()); - weights.push(1); - } else { - // Create an updated distribution and set of services. - for Target { weight, addr } in targets.into_iter() { - // Reuse the prior services whenever possible. - if !prior.remove(&addr) { - debug!(%addr, "Creating target"); - let svc = self - .new_service - .new_service((addr.clone(), self.target.clone())); - self.services.push(addr.clone(), svc); + match self.inner { + Inner::Default(ref mut svc) => Box::pin(svc.call(req).err_into::()), + Inner::Split { + ref addrs, + ref distribution, + ref mut services, + ref mut rng, + .. + } => { + let idx = if addrs.len() == 1 { + 0 } else { - debug!(%addr, "Target already exists"); - } - addrs.insert(addr); - weights.push(weight); + distribution.sample(rng) + }; + let addr = addrs.get_index(idx).expect("invalid index"); + trace!(?addr, "Dispatching"); + Box::pin(services.call_ready(addr, req).err_into::()) } } - - for addr in prior { - self.services.evict(&addr); - } - if !addrs.contains(self.target.as_ref()) { - self.services.evict(self.target.as_ref()); - } - - debug_assert_ne!(addrs.len(), 0, "addrs empty"); - debug_assert_eq!(addrs.len(), weights.len(), "addrs does not match weights"); - // The cache may still contain evicted pending services until the next - // poll. - debug_assert!( - addrs.len() <= self.services.len(), - "addrs does not match the number of services" - ); - let distribution = WeightedIndex::new(weights).expect("Split must be valid"); - self.inner = Some(Inner { - addrs, - distribution, - }); } }