Skip to content

Commit

Permalink
cache: replace Lock with Buffer (#587)
Browse files Browse the repository at this point in the history
The use of a buffer hides some of the type complexity of the inner
service (all of the type complexity, after #586). However, Linkerd
currently only uses buffers at the `Service` level, not at the
`MakeService` level. Since `MakeService`/`NewService`s tend to be
generic over both inner `MakeService` types _and_ over the produced
`Service`s (as well as potentially two future types), they probably have
much longer concrete types than `Service`s in most cases.

As @olix0r suggested, we can replace the `Lock` that's currently used to
ensure exclusive access to `Cache`s (which are at the `MakeService`)s
with `Buffer`s. The `Lock` is currently essentially doing something
quite similar to adding a `Buffer` anyway. Introducing buffers around
all caches erases the inner type for everything layered around the
cache, which should make overall type length much shorter.

This seems to have a fairly noticeable impact on build time and memory
use (see linkerd/linkerd2#4676). On my machine, running `make docker` on
#586 gets SIGKILLed (presumably by the OOM killer) after 7m53s. After
this change, `make docker` completes successfully after 1m44s.

Also, the `linkerd2-lock` crate can be removed, as it was used only by 
`Cache`.

To work around increased tail latencies when the buffer fills up, this
branch also sets the default buffer capacity to be equal to the 
default concurrency limit, rather than 10. This is fine, since the buffer
capacity isn't _actually_ what enforces a bound on proxy memory use.
The spawned tasks waiting on a full buffer are still sitting on the Tokio
executor, and it's actually the in flight limit that stops us from accepting
any more requests when we have too many in flight.

Depends on #586.
Fixes linkerd/linkerd2#4676.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw authored Jul 1, 2020
1 parent 89d7a30 commit f897208
Show file tree
Hide file tree
Showing 16 changed files with 37 additions and 637 deletions.
19 changes: 0 additions & 19 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,6 @@ dependencies = [
"linkerd2-exp-backoff",
"linkerd2-http-classify",
"linkerd2-http-metrics",
"linkerd2-lock",
"linkerd2-metrics",
"linkerd2-opencensus",
"linkerd2-proxy-api",
Expand Down Expand Up @@ -987,7 +986,6 @@ version = "0.1.0"
dependencies = [
"futures 0.3.5",
"linkerd2-error",
"linkerd2-lock",
"linkerd2-stack",
"tokio 0.2.21",
"tower",
Expand Down Expand Up @@ -1167,23 +1165,6 @@ dependencies = [
"tokio-test",
]

[[package]]
name = "linkerd2-lock"
version = "0.1.0"
dependencies = [
"futures 0.3.5",
"linkerd2-error",
"rand 0.7.2",
"tokio 0.2.21",
"tokio-test",
"tower",
"tower-test",
"tracing",
"tracing-futures",
"tracing-log",
"tracing-subscriber",
]

[[package]]
name = "linkerd2-metrics"
version = "0.1.0"
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ members = [
"linkerd/http-metrics",
"linkerd/identity",
"linkerd/io",
"linkerd/lock",
"linkerd/metrics",
"linkerd/opencensus",
"linkerd/proxy/api-resolve",
Expand Down
1 change: 0 additions & 1 deletion linkerd/app/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ linkerd2-error-respond = { path = "../../error-respond" }
linkerd2-exp-backoff = { path = "../../exp-backoff" }
linkerd2-http-classify = { path = "../../http-classify" }
linkerd2-http-metrics = { path = "../../http-metrics" }
linkerd2-lock = { path = "../../lock" }
linkerd2-metrics = { path = "../../metrics" }
linkerd2-opencensus = { path = "../../opencensus" }
linkerd2-proxy-core = { path = "../../proxy/core" }
Expand Down
6 changes: 0 additions & 6 deletions linkerd/app/core/src/svc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::transport::Connect;
use crate::{cache, Error};
pub use linkerd2_buffer as buffer;
use linkerd2_concurrency_limit as concurrency_limit;
pub use linkerd2_lock as lock;
pub use linkerd2_stack::{self as stack, layer, NewService};
pub use linkerd2_stack_tracing::{InstrumentMake, InstrumentMakeLayer};
pub use linkerd2_timeout as timeout;
Expand Down Expand Up @@ -130,11 +129,6 @@ impl<L> Layers<L> {
self.push(buffer::SpawnBufferLayer::new(capacity).with_idle_timeout(idle_timeout))
}

/// Makes the inner service shareable in a mutually-exclusive fashion.
pub fn push_lock(self) -> Layers<Pair<L, lock::LockLayer>> {
self.push(lock::LockLayer::new())
}

// Makes the service eagerly process and fail requests after the given timeout.
pub fn push_failfast(self, timeout: Duration) -> Layers<Pair<L, timeout::FailFastLayer>> {
self.push(timeout::FailFastLayer::new(timeout))
Expand Down
7 changes: 2 additions & 5 deletions linkerd/app/inbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,8 @@ impl Config {
.push(metrics.stack.layer(stack_labels("target"))),
),
)
.spawn_buffer(buffer_capacity)
.instrument(|_: &Target| info_span!("target"))
// Prevent the cache's lock from being acquired in poll_ready, ensuring this happens
// in the response future. This prevents buffers from holding the cache's lock.
.push_oneshot()
.check_service::<Target>();

// Routes targets to a Profile stack, i.e. so that profile
Expand Down Expand Up @@ -268,10 +266,9 @@ impl Config {
.push(metrics.stack.layer(stack_labels("profile"))),
),
)
.spawn_buffer(buffer_capacity)
.instrument(|p: &Profile| info_span!("profile", addr = %p.addr()))
.check_make_service::<Profile, Target>()
// Ensures that cache's lock isn't held in poll_ready.
.push_oneshot()
.push(router::Layer::new(|()| ProfileTarget))
.check_new_service_routes::<(), Target>()
.new_service(());
Expand Down
19 changes: 6 additions & 13 deletions linkerd/app/outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,10 @@ impl Config {
.push(metrics.layer(stack_labels("refine"))),
),
)
.spawn_buffer(self.proxy.buffer_capacity)
.instrument(|name: &dns::Name| info_span!("refine", %name))
// Obtains the service, advances the state of the resolution
.push(svc::make_response::Layer)
// Ensures that the cache isn't locked when polling readiness.
.push_oneshot()
.into_inner()
}

Expand Down Expand Up @@ -295,9 +294,8 @@ impl Config {
.push(metrics.stack.layer(stack_labels("balance"))),
),
)
.instrument(|c: &Concrete<http::Settings>| info_span!("balance", addr = %c.addr))
// Ensure that buffers don't hold the cache's lock in poll_ready.
.push_oneshot();
.spawn_buffer(buffer_capacity)
.instrument(|c: &Concrete<http::Settings>| info_span!("balance", addr = %c.addr));

// Caches clients that bypass discovery/balancing.
//
Expand All @@ -319,16 +317,15 @@ impl Config {
.push(metrics.stack.layer(stack_labels("forward.endpoint"))),
),
)
.spawn_buffer(buffer_capacity)
.instrument(|endpoint: &Target<HttpEndpoint>| {
info_span!("forward", peer.addr = %endpoint.addr, peer.id = ?endpoint.inner.identity)
})
.push_map_target(|t: Concrete<HttpEndpoint>| Target {
addr: t.addr.into(),
inner: t.inner.inner,
})
.check_service::<Concrete<HttpEndpoint>>()
// Ensure that buffers don't hold the cache's lock in poll_ready.
.push_oneshot();
.check_service::<Concrete<HttpEndpoint>>();

// If the balancer fails to be created, i.e., because it is unresolvable, fall back to
// using a router that dispatches request to the application-selected original destination.
Expand Down Expand Up @@ -396,9 +393,8 @@ impl Config {
.push(metrics.stack.layer(stack_labels("profile"))),
),
)
.spawn_buffer(buffer_capacity)
.instrument(|_: &Profile| info_span!("profile"))
// Ensures that the cache isn't locked when polling readiness.
.push_oneshot()
.check_make_service::<Profile, Logical<HttpEndpoint>>()
.push(router::Layer::new(|()| ProfilePerTarget))
.check_new_service_routes::<(), Logical<HttpEndpoint>>()
Expand Down Expand Up @@ -638,9 +634,6 @@ fn is_discovery_rejected(err: &Error) -> bool {
if let Some(e) = err.downcast_ref::<svc::buffer::error::ServiceError>() {
return is_discovery_rejected(e.inner());
}
if let Some(e) = err.downcast_ref::<svc::lock::error::ServiceError>() {
return is_discovery_rejected(e.inner());
}

err.is::<DiscoveryRejected>() || err.is::<profiles::InvalidProfileAddr>()
}
12 changes: 5 additions & 7 deletions linkerd/app/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,15 @@ const DEFAULT_RESOLV_CONF: &str = "/etc/resolv.conf";
const DEFAULT_INITIAL_STREAM_WINDOW_SIZE: u32 = 65_535; // Protocol default
const DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE: u32 = 1048576; // 1MB ~ 16 streams at capacity

// Because buffers propagate readiness, they should only need enough capacity to satisfy the
// process's concurrency. This should probably be derived from the number of CPUs, but the num-cpus
// crate does not support cgroups yet [seanmonstar/num_cpus#80].
const DEFAULT_BUFFER_CAPACITY: usize = 10;
// 10_000 is arbitrarily chosen for now...
const DEFAULT_BUFFER_CAPACITY: usize = 10_000;

const DEFAULT_INBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(60);
const DEFAULT_OUTBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(60);

// 10_000 is arbitrarily chosen for now...
const DEFAULT_INBOUND_MAX_IN_FLIGHT: usize = 10_000;
const DEFAULT_OUTBOUND_MAX_IN_FLIGHT: usize = 10_000;
// By default, don't accept more requests than we can buffer.
const DEFAULT_INBOUND_MAX_IN_FLIGHT: usize = DEFAULT_BUFFER_CAPACITY;
const DEFAULT_OUTBOUND_MAX_IN_FLIGHT: usize = DEFAULT_BUFFER_CAPACITY;

const DEFAULT_DESTINATION_GET_SUFFIXES: &str = "svc.cluster.local.";
const DEFAULT_DESTINATION_PROFILE_SUFFIXES: &str = "svc.cluster.local.";
Expand Down
1 change: 0 additions & 1 deletion linkerd/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ publish = false
[dependencies]
futures = "0.3"
linkerd2-error = { path = "../error" }
linkerd2-lock = { path = "../lock" }
linkerd2-stack = { path = "../stack" }
tokio = "0.2"
tower = { version = "0.3", default-features = false, features = ["util"] }
Expand Down
69 changes: 24 additions & 45 deletions linkerd/cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![deny(warnings, rust_2018_idioms)]
use futures::future;
use linkerd2_error::Never;
use linkerd2_lock::{Guard, Lock};
use linkerd2_stack::NewService;
use std::collections::HashMap;
use std::hash::Hash;
Expand All @@ -19,8 +18,7 @@ where
N: NewService<(T, Handle)>,
{
new_service: N,
lock: Lock<Services<T, N::Service>>,
guard: Option<Guard<Services<T, N::Service>>>,
services: Services<T, N::Service>,
}

/// A tracker inserted into each inner service that, when dropped, indicates the service may be
Expand All @@ -34,68 +32,33 @@ type Services<T, S> = HashMap<T, (S, Weak<()>)>;

impl<T, N> Cache<T, N>
where
T: Eq + Hash + Send + 'static,
T: Eq + Hash + Send,
N: NewService<(T, Handle)>,
{
pub fn new(new_service: N) -> Self {
Self {
new_service,
guard: None,
lock: Lock::new(Services::default()),
}
}
}

impl<T, N> Clone for Cache<T, N>
where
T: Clone + Eq + Hash,
N: NewService<(T, Handle)> + Clone,
N::Service: Clone,
{
fn clone(&self) -> Self {
Self {
new_service: self.new_service.clone(),
lock: self.lock.clone(),
guard: None,
services: Services::default(),
}
}
}

impl<T, N> tower::Service<T> for Cache<T, N>
where
T: Clone + Eq + Hash + Send + 'static,
T: Clone + Eq + Hash + Send,
N: NewService<(T, Handle)>,
N::Service: Clone + Send + 'static,
N::Service: Clone + Send,
{
type Response = N::Service;
type Error = Never;
type Future = future::Ready<Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if self.guard.is_none() {
let mut services = futures::ready!(self.lock.poll_acquire(cx));
// Drop defunct services before interacting with the cache.
let n = services.len();
services.retain(|_, (_, weak)| {
if weak.strong_count() > 0 {
true
} else {
trace!("Dropping defunct service");
false
}
});
debug!(services = services.len(), dropped = n - services.len());
self.guard = Some(services);
}

debug_assert!(self.guard.is_some(), "guard must be acquired");
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, target: T) -> Self::Future {
let mut services = self.guard.take().expect("poll_ready must be called");

if let Some((service, weak)) = services.get(&target) {
if let Some((service, weak)) = self.services.get(&target) {
if weak.upgrade().is_some() {
trace!("Using cached service");
return future::ok(service.clone());
Expand All @@ -109,8 +72,24 @@ where
.new_service
.new_service((target.clone(), Handle(handle)));

// Drop defunct services before inserting the new service into the
// cache.
let n = self.services.len();
self.services.retain(|_, (_, weak)| {
if weak.strong_count() > 0 {
true
} else {
trace!("Dropping defunct service");
false
}
});
debug!(
services = self.services.len(),
dropped = n - self.services.len()
);

debug!("Caching new service");
services.insert(target, (service.clone(), weak));
self.services.insert(target, (service.clone(), weak));

future::ok(service.into())
}
Expand Down
25 changes: 0 additions & 25 deletions linkerd/lock/Cargo.toml

This file was deleted.

29 changes: 0 additions & 29 deletions linkerd/lock/src/error.rs

This file was deleted.

20 changes: 0 additions & 20 deletions linkerd/lock/src/layer.rs

This file was deleted.

Loading

0 comments on commit f897208

Please sign in to comment.