From f8972084bc5d210808eb310ccac272d4ef7a75d9 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 1 Jul 2020 14:13:54 -0700 Subject: [PATCH] cache: replace Lock with Buffer (#587) The use of a buffer hides some of the type complexity of the inner service (all of the type complexity, after #586). However, Linkerd currently only uses buffers at the `Service` level, not at the `MakeService` level. Since `MakeService`/`NewService`s tend to be generic over both inner `MakeService` types _and_ over the produced `Service`s (as well as potentially two future types), they probably have much longer concrete types than `Service`s in most cases. As @olix0r suggested, we can replace the `Lock` that's currently used to ensure exclusive access to `Cache`s (which are at the `MakeService`)s with `Buffer`s. The `Lock` is currently essentially doing something quite similar to adding a `Buffer` anyway. Introducing buffers around all caches erases the inner type for everything layered around the cache, which should make overall type length much shorter. This seems to have a fairly noticeable impact on build time and memory use (see linkerd/linkerd2#4676). On my machine, running `make docker` on #586 gets SIGKILLed (presumably by the OOM killer) after 7m53s. After this change, `make docker` completes successfully after 1m44s. Also, the `linkerd2-lock` crate can be removed, as it was used only by `Cache`. To work around increased tail latencies when the buffer fills up, this branch also sets the default buffer capacity to be equal to the default concurrency limit, rather than 10. This is fine, since the buffer capacity isn't _actually_ what enforces a bound on proxy memory use. The spawned tasks waiting on a full buffer are still sitting on the Tokio executor, and it's actually the in flight limit that stops us from accepting any more requests when we have too many in flight. Depends on #586. Fixes linkerd/linkerd2#4676. Signed-off-by: Eliza Weisman --- Cargo.lock | 19 --- Cargo.toml | 1 - linkerd/app/core/Cargo.toml | 1 - linkerd/app/core/src/svc.rs | 6 - linkerd/app/inbound/src/lib.rs | 7 +- linkerd/app/outbound/src/lib.rs | 19 +-- linkerd/app/src/env.rs | 12 +- linkerd/cache/Cargo.toml | 1 - linkerd/cache/src/lib.rs | 69 +++----- linkerd/lock/Cargo.toml | 25 --- linkerd/lock/src/error.rs | 29 ---- linkerd/lock/src/layer.rs | 20 --- linkerd/lock/src/lib.rs | 16 -- linkerd/lock/src/lock.rs | 79 --------- linkerd/lock/src/service.rs | 90 ---------- linkerd/lock/src/test.rs | 280 -------------------------------- 16 files changed, 37 insertions(+), 637 deletions(-) delete mode 100644 linkerd/lock/Cargo.toml delete mode 100644 linkerd/lock/src/error.rs delete mode 100644 linkerd/lock/src/layer.rs delete mode 100644 linkerd/lock/src/lib.rs delete mode 100644 linkerd/lock/src/lock.rs delete mode 100644 linkerd/lock/src/service.rs delete mode 100644 linkerd/lock/src/test.rs diff --git a/Cargo.lock b/Cargo.lock index e7377cd2ff..44809b1d44 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -836,7 +836,6 @@ dependencies = [ "linkerd2-exp-backoff", "linkerd2-http-classify", "linkerd2-http-metrics", - "linkerd2-lock", "linkerd2-metrics", "linkerd2-opencensus", "linkerd2-proxy-api", @@ -987,7 +986,6 @@ version = "0.1.0" dependencies = [ "futures 0.3.5", "linkerd2-error", - "linkerd2-lock", "linkerd2-stack", "tokio 0.2.21", "tower", @@ -1167,23 +1165,6 @@ dependencies = [ "tokio-test", ] -[[package]] -name = "linkerd2-lock" -version = "0.1.0" -dependencies = [ - "futures 0.3.5", - "linkerd2-error", - "rand 0.7.2", - "tokio 0.2.21", - "tokio-test", - "tower", - "tower-test", - "tracing", - "tracing-futures", - "tracing-log", - "tracing-subscriber", -] - [[package]] name = "linkerd2-metrics" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index b1372b93b5..fdb0bd336d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,6 @@ members = [ "linkerd/http-metrics", "linkerd/identity", "linkerd/io", - "linkerd/lock", "linkerd/metrics", "linkerd/opencensus", "linkerd/proxy/api-resolve", diff --git a/linkerd/app/core/Cargo.toml b/linkerd/app/core/Cargo.toml index 7cc57d505e..7fd1b4ed43 100644 --- a/linkerd/app/core/Cargo.toml +++ b/linkerd/app/core/Cargo.toml @@ -38,7 +38,6 @@ linkerd2-error-respond = { path = "../../error-respond" } linkerd2-exp-backoff = { path = "../../exp-backoff" } linkerd2-http-classify = { path = "../../http-classify" } linkerd2-http-metrics = { path = "../../http-metrics" } -linkerd2-lock = { path = "../../lock" } linkerd2-metrics = { path = "../../metrics" } linkerd2-opencensus = { path = "../../opencensus" } linkerd2-proxy-core = { path = "../../proxy/core" } diff --git a/linkerd/app/core/src/svc.rs b/linkerd/app/core/src/svc.rs index 0ce851a44b..a64146bc4e 100644 --- a/linkerd/app/core/src/svc.rs +++ b/linkerd/app/core/src/svc.rs @@ -5,7 +5,6 @@ use crate::transport::Connect; use crate::{cache, Error}; pub use linkerd2_buffer as buffer; use linkerd2_concurrency_limit as concurrency_limit; -pub use linkerd2_lock as lock; pub use linkerd2_stack::{self as stack, layer, NewService}; pub use linkerd2_stack_tracing::{InstrumentMake, InstrumentMakeLayer}; pub use linkerd2_timeout as timeout; @@ -130,11 +129,6 @@ impl Layers { self.push(buffer::SpawnBufferLayer::new(capacity).with_idle_timeout(idle_timeout)) } - /// Makes the inner service shareable in a mutually-exclusive fashion. - pub fn push_lock(self) -> Layers> { - self.push(lock::LockLayer::new()) - } - // Makes the service eagerly process and fail requests after the given timeout. pub fn push_failfast(self, timeout: Duration) -> Layers> { self.push(timeout::FailFastLayer::new(timeout)) diff --git a/linkerd/app/inbound/src/lib.rs b/linkerd/app/inbound/src/lib.rs index 421ced49bc..41b850f612 100644 --- a/linkerd/app/inbound/src/lib.rs +++ b/linkerd/app/inbound/src/lib.rs @@ -237,10 +237,8 @@ impl Config { .push(metrics.stack.layer(stack_labels("target"))), ), ) + .spawn_buffer(buffer_capacity) .instrument(|_: &Target| info_span!("target")) - // Prevent the cache's lock from being acquired in poll_ready, ensuring this happens - // in the response future. This prevents buffers from holding the cache's lock. - .push_oneshot() .check_service::(); // Routes targets to a Profile stack, i.e. so that profile @@ -268,10 +266,9 @@ impl Config { .push(metrics.stack.layer(stack_labels("profile"))), ), ) + .spawn_buffer(buffer_capacity) .instrument(|p: &Profile| info_span!("profile", addr = %p.addr())) .check_make_service::() - // Ensures that cache's lock isn't held in poll_ready. - .push_oneshot() .push(router::Layer::new(|()| ProfileTarget)) .check_new_service_routes::<(), Target>() .new_service(()); diff --git a/linkerd/app/outbound/src/lib.rs b/linkerd/app/outbound/src/lib.rs index 15a6d790b5..e0a3c8e648 100644 --- a/linkerd/app/outbound/src/lib.rs +++ b/linkerd/app/outbound/src/lib.rs @@ -117,11 +117,10 @@ impl Config { .push(metrics.layer(stack_labels("refine"))), ), ) + .spawn_buffer(self.proxy.buffer_capacity) .instrument(|name: &dns::Name| info_span!("refine", %name)) // Obtains the service, advances the state of the resolution .push(svc::make_response::Layer) - // Ensures that the cache isn't locked when polling readiness. - .push_oneshot() .into_inner() } @@ -295,9 +294,8 @@ impl Config { .push(metrics.stack.layer(stack_labels("balance"))), ), ) - .instrument(|c: &Concrete| info_span!("balance", addr = %c.addr)) - // Ensure that buffers don't hold the cache's lock in poll_ready. - .push_oneshot(); + .spawn_buffer(buffer_capacity) + .instrument(|c: &Concrete| info_span!("balance", addr = %c.addr)); // Caches clients that bypass discovery/balancing. // @@ -319,6 +317,7 @@ impl Config { .push(metrics.stack.layer(stack_labels("forward.endpoint"))), ), ) + .spawn_buffer(buffer_capacity) .instrument(|endpoint: &Target| { info_span!("forward", peer.addr = %endpoint.addr, peer.id = ?endpoint.inner.identity) }) @@ -326,9 +325,7 @@ impl Config { addr: t.addr.into(), inner: t.inner.inner, }) - .check_service::>() - // Ensure that buffers don't hold the cache's lock in poll_ready. - .push_oneshot(); + .check_service::>(); // If the balancer fails to be created, i.e., because it is unresolvable, fall back to // using a router that dispatches request to the application-selected original destination. @@ -396,9 +393,8 @@ impl Config { .push(metrics.stack.layer(stack_labels("profile"))), ), ) + .spawn_buffer(buffer_capacity) .instrument(|_: &Profile| info_span!("profile")) - // Ensures that the cache isn't locked when polling readiness. - .push_oneshot() .check_make_service::>() .push(router::Layer::new(|()| ProfilePerTarget)) .check_new_service_routes::<(), Logical>() @@ -638,9 +634,6 @@ fn is_discovery_rejected(err: &Error) -> bool { if let Some(e) = err.downcast_ref::() { return is_discovery_rejected(e.inner()); } - if let Some(e) = err.downcast_ref::() { - return is_discovery_rejected(e.inner()); - } err.is::() || err.is::() } diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 47e5362efa..367776fbda 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -206,17 +206,15 @@ const DEFAULT_RESOLV_CONF: &str = "/etc/resolv.conf"; const DEFAULT_INITIAL_STREAM_WINDOW_SIZE: u32 = 65_535; // Protocol default const DEFAULT_INITIAL_CONNECTION_WINDOW_SIZE: u32 = 1048576; // 1MB ~ 16 streams at capacity -// Because buffers propagate readiness, they should only need enough capacity to satisfy the -// process's concurrency. This should probably be derived from the number of CPUs, but the num-cpus -// crate does not support cgroups yet [seanmonstar/num_cpus#80]. -const DEFAULT_BUFFER_CAPACITY: usize = 10; +// 10_000 is arbitrarily chosen for now... +const DEFAULT_BUFFER_CAPACITY: usize = 10_000; const DEFAULT_INBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(60); const DEFAULT_OUTBOUND_ROUTER_MAX_IDLE_AGE: Duration = Duration::from_secs(60); -// 10_000 is arbitrarily chosen for now... -const DEFAULT_INBOUND_MAX_IN_FLIGHT: usize = 10_000; -const DEFAULT_OUTBOUND_MAX_IN_FLIGHT: usize = 10_000; +// By default, don't accept more requests than we can buffer. +const DEFAULT_INBOUND_MAX_IN_FLIGHT: usize = DEFAULT_BUFFER_CAPACITY; +const DEFAULT_OUTBOUND_MAX_IN_FLIGHT: usize = DEFAULT_BUFFER_CAPACITY; const DEFAULT_DESTINATION_GET_SUFFIXES: &str = "svc.cluster.local."; const DEFAULT_DESTINATION_PROFILE_SUFFIXES: &str = "svc.cluster.local."; diff --git a/linkerd/cache/Cargo.toml b/linkerd/cache/Cargo.toml index 3e23702fce..822b8851b0 100644 --- a/linkerd/cache/Cargo.toml +++ b/linkerd/cache/Cargo.toml @@ -8,7 +8,6 @@ publish = false [dependencies] futures = "0.3" linkerd2-error = { path = "../error" } -linkerd2-lock = { path = "../lock" } linkerd2-stack = { path = "../stack" } tokio = "0.2" tower = { version = "0.3", default-features = false, features = ["util"] } diff --git a/linkerd/cache/src/lib.rs b/linkerd/cache/src/lib.rs index 24eb404b04..bb903895fd 100644 --- a/linkerd/cache/src/lib.rs +++ b/linkerd/cache/src/lib.rs @@ -1,7 +1,6 @@ #![deny(warnings, rust_2018_idioms)] use futures::future; use linkerd2_error::Never; -use linkerd2_lock::{Guard, Lock}; use linkerd2_stack::NewService; use std::collections::HashMap; use std::hash::Hash; @@ -19,8 +18,7 @@ where N: NewService<(T, Handle)>, { new_service: N, - lock: Lock>, - guard: Option>>, + services: Services, } /// A tracker inserted into each inner service that, when dropped, indicates the service may be @@ -34,68 +32,33 @@ type Services = HashMap)>; impl Cache where - T: Eq + Hash + Send + 'static, + T: Eq + Hash + Send, N: NewService<(T, Handle)>, { pub fn new(new_service: N) -> Self { Self { new_service, - guard: None, - lock: Lock::new(Services::default()), - } - } -} - -impl Clone for Cache -where - T: Clone + Eq + Hash, - N: NewService<(T, Handle)> + Clone, - N::Service: Clone, -{ - fn clone(&self) -> Self { - Self { - new_service: self.new_service.clone(), - lock: self.lock.clone(), - guard: None, + services: Services::default(), } } } impl tower::Service for Cache where - T: Clone + Eq + Hash + Send + 'static, + T: Clone + Eq + Hash + Send, N: NewService<(T, Handle)>, - N::Service: Clone + Send + 'static, + N::Service: Clone + Send, { type Response = N::Service; type Error = Never; type Future = future::Ready>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.guard.is_none() { - let mut services = futures::ready!(self.lock.poll_acquire(cx)); - // Drop defunct services before interacting with the cache. - let n = services.len(); - services.retain(|_, (_, weak)| { - if weak.strong_count() > 0 { - true - } else { - trace!("Dropping defunct service"); - false - } - }); - debug!(services = services.len(), dropped = n - services.len()); - self.guard = Some(services); - } - - debug_assert!(self.guard.is_some(), "guard must be acquired"); + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, target: T) -> Self::Future { - let mut services = self.guard.take().expect("poll_ready must be called"); - - if let Some((service, weak)) = services.get(&target) { + if let Some((service, weak)) = self.services.get(&target) { if weak.upgrade().is_some() { trace!("Using cached service"); return future::ok(service.clone()); @@ -109,8 +72,24 @@ where .new_service .new_service((target.clone(), Handle(handle))); + // Drop defunct services before inserting the new service into the + // cache. + let n = self.services.len(); + self.services.retain(|_, (_, weak)| { + if weak.strong_count() > 0 { + true + } else { + trace!("Dropping defunct service"); + false + } + }); + debug!( + services = self.services.len(), + dropped = n - self.services.len() + ); + debug!("Caching new service"); - services.insert(target, (service.clone(), weak)); + self.services.insert(target, (service.clone(), weak)); future::ok(service.into()) } diff --git a/linkerd/lock/Cargo.toml b/linkerd/lock/Cargo.toml deleted file mode 100644 index e902fcb3d4..0000000000 --- a/linkerd/lock/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "linkerd2-lock" -version = "0.1.0" -authors = ["Linkerd Developers "] -edition = "2018" -publish = false -description = """ -A middleware that provides mutual exclusion. -""" - -[dependencies] -futures = "0.3" -linkerd2-error = { path = "../error" } -tower = { version = "0.3", default-features = false } -tracing = "0.1" -tokio = { version = "0.2.19", features = ["sync", "macros", "rt-core"] } - -[dev-dependencies] -rand = "0.7" -tracing-futures = { version = "0.2", features = ["std-future"] } -tracing-log = "0.1" -tracing-subscriber = "0.2.5" -tower = { version = "0.3", default-features = false, features = ["util"] } -tokio-test = "0.2" -tower-test = "0.3" diff --git a/linkerd/lock/src/error.rs b/linkerd/lock/src/error.rs deleted file mode 100644 index 771dc584b6..0000000000 --- a/linkerd/lock/src/error.rs +++ /dev/null @@ -1,29 +0,0 @@ -pub use linkerd2_error::Error; -use std::sync::Arc; - -#[derive(Clone, Debug)] -pub struct ServiceError(Arc); - -// === impl ServiceError === - -impl ServiceError { - pub(crate) fn new(e: Arc) -> Self { - ServiceError(e) - } - - pub fn inner(&self) -> &Error { - self.0.as_ref() - } -} - -impl std::fmt::Display for ServiceError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl std::error::Error for ServiceError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - Some(&**self.0.as_ref()) - } -} diff --git a/linkerd/lock/src/layer.rs b/linkerd/lock/src/layer.rs deleted file mode 100644 index 4abdf0c7f9..0000000000 --- a/linkerd/lock/src/layer.rs +++ /dev/null @@ -1,20 +0,0 @@ -use super::LockService; - -#[derive(Clone, Debug, Default)] -pub struct LockLayer(()); - -// === impl Layer === - -impl LockLayer { - pub fn new() -> Self { - LockLayer(()) - } -} - -impl tower::layer::Layer for LockLayer { - type Service = LockService; - - fn layer(&self, inner: S) -> Self::Service { - Self::Service::new(inner) - } -} diff --git a/linkerd/lock/src/lib.rs b/linkerd/lock/src/lib.rs deleted file mode 100644 index 5de5d14178..0000000000 --- a/linkerd/lock/src/lib.rs +++ /dev/null @@ -1,16 +0,0 @@ -//! A middleware for sharing an inner service via mutual exclusion. - -#![deny(warnings, rust_2018_idioms)] - -pub mod error; -mod layer; -mod lock; -mod service; -#[cfg(test)] -mod test; - -pub use self::{ - layer::LockLayer, - lock::{Guard, Lock}, - service::LockService, -}; diff --git a/linkerd/lock/src/lock.rs b/linkerd/lock/src/lock.rs deleted file mode 100644 index ba8f0dcf1a..0000000000 --- a/linkerd/lock/src/lock.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::task::{Context, Poll}; -use std::{future::Future, pin::Pin, sync::Arc}; -use tokio::sync::Mutex; -pub use tokio::sync::OwnedMutexGuard as Guard; - -/// Provides mutually exclusive to a `T`-typed value, asynchronously. -pub struct Lock { - /// Set when this Lock is interested in acquiring the value. - waiting: Option> + Send + 'static>>>, - lock: Arc>, -} - -// === impl Lock === - -impl Lock { - pub fn new(value: S) -> Self { - Self { - waiting: None, - lock: Arc::new(Mutex::new(value)), - } - } -} - -impl Clone for Lock { - fn clone(&self) -> Self { - Self { - // Clones have an independent local lock state. - waiting: None, - lock: self.lock.clone(), - } - } -} - -impl Lock { - /// Attempt to acquire the lock, returning `Pending` if it is held - /// elsewhere. - /// - /// If this `Lock` instance is not - pub fn poll_acquire(&mut self, cx: &mut Context<'_>) -> Poll> { - // If we have already registered interest in the lock and are waiting on - // a future, we'll poll that. Otherwise, we need a future. - // - // We `take` the waiting future so that if we drive it to completion on - // this poll, it won't be set on subsequent polls. - let mut waiting = self.waiting.take().unwrap_or_else(|| { - // This instance has not registered interest in the lock. - Box::pin(self.lock.clone().lock_owned()) - }); - - // Poll the future. - let res = { - let future = &mut waiting; - tokio::pin!(future); - future.poll(cx) - }; - - tracing::trace!(ready = res.is_ready()); - - // If the future hasn't completed, save it to be polled again the next - // time `poll_acquire` is called. - if res.is_pending() { - self.waiting = Some(waiting); - } - - res - } -} - -impl Lock { - // optimization so we can elide the `Box::pin` when we just want a future - pub async fn acquire(&mut self) -> Guard { - // Are we already waiting? If so, reuse that... - if let Some(waiting) = self.waiting.take() { - waiting.await - } else { - self.lock.clone().lock_owned().await - } - } -} diff --git a/linkerd/lock/src/service.rs b/linkerd/lock/src/service.rs deleted file mode 100644 index 2c6eb9dc37..0000000000 --- a/linkerd/lock/src/service.rs +++ /dev/null @@ -1,90 +0,0 @@ -use crate::error::{Error, ServiceError}; -use crate::{Guard, Lock}; -use futures::{future, TryFutureExt}; -use std::sync::Arc; -use std::task::{Context, Poll}; -use tracing::trace; - -/// A middleware that safely shares an inner service among clones. -/// -/// As the service is polled to readiness, the lock is acquired and the inner service is polled. If -/// the service is cloned, the service's lock state isnot retained by the clone. -pub struct LockService { - lock: Lock>, - guard: Option>>, -} - -impl LockService { - pub fn new(inner: S) -> Self { - Self { - lock: Lock::new(Ok(inner)), - guard: None, - } - } -} - -impl Clone for LockService { - fn clone(&self) -> Self { - Self { - lock: self.lock.clone(), - guard: None, - } - } -} - -impl tower::Service for LockService -where - S: tower::Service + Send + 'static, - S::Error: Into, -{ - type Response = S::Response; - type Error = Error; - type Future = future::MapErr Self::Error>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - trace!(acquired = self.guard.is_some()); - if let Some(guard) = self.guard.as_mut() { - return match guard.as_mut() { - Err(err) => Poll::Ready(Err(err.clone().into())), - Ok(ref mut svc) => match svc.poll_ready(cx) { - Poll::Ready(Err(inner)) => { - let error = ServiceError::new(Arc::new(inner.into())); - **guard = Err(error.clone()); - - // Drop the guard. - self.guard = None; - Poll::Ready(Err(error.into())) - } - Poll::Pending => { - trace!(ready = false); - Poll::Pending - } - Poll::Ready(Ok(())) => { - trace!(ready = true); - debug_assert!(self.guard.is_some()); - Poll::Ready(Ok(())) - } - }, - }; - } - debug_assert!(self.guard.is_none()); - - let guard = futures::ready!(self.lock.poll_acquire(cx)); - self.guard = Some(guard); - } - } - - fn call(&mut self, req: T) -> Self::Future { - trace!("Calling"); - // The service must have been acquired by poll_ready. Reset this lock's - // state so that it must reacquire the service via poll_ready. - self.guard - .take() - .expect("Called before ready") - .as_mut() - .expect("Called before ready") - .call(req) - .map_err(Into::into) - } -} diff --git a/linkerd/lock/src/test.rs b/linkerd/lock/src/test.rs deleted file mode 100644 index ca4be1e6dc..0000000000 --- a/linkerd/lock/src/test.rs +++ /dev/null @@ -1,280 +0,0 @@ -use crate::error::ServiceError; -use crate::LockService; -use futures::{future, StreamExt}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::task::{Context, Poll}; -use std::{future::Future, pin::Pin, sync::Arc}; -use tokio::sync::oneshot; -use tokio_test::{assert_pending, assert_ready, assert_ready_ok}; -use tower::Service as _Service; -use tower_test::mock::Spawn; -use tracing::{info_span, trace}; -use tracing_futures::Instrument; - -#[tokio::test] -async fn exclusive_access() { - let ready = Arc::new(AtomicBool::new(false)); - let mut svc0 = Spawn::new(LockService::new(Decr::new(2, ready.clone()))); - - // svc0 grabs the lock, but the inner service isn't ready. - assert_pending!(svc0.poll_ready()); - - // Cloning a locked service does not preserve the lock. - let mut svc1 = svc0.clone(); - - // svc1 can't grab the lock. - assert_pending!(svc1.poll_ready()); - - // svc0 holds the lock and becomes ready with the inner service. - ready.store(true, Ordering::SeqCst); - assert_ready_ok!(svc0.poll_ready()); - - // svc1 still can't grab the lock. - assert_pending!(svc1.poll_ready()); - - // svc0 remains ready. - let fut0 = svc0.call(1); - - // svc1 grabs the lock and is immediately ready. - assert_ready_ok!(svc1.poll_ready()); - // svc0 cannot grab the lock. - assert_pending!(svc0.poll_ready()); - - let fut1 = svc1.call(1); - - tokio::try_join!(fut0, fut1).expect("must not fail!"); -} - -#[tokio::test] -async fn propagates_errors() { - let mut svc0 = Spawn::new(LockService::new(Decr::from(1))); - - // svc0 grabs the lock and we decr the service so it will fail. - assert_ready_ok!(svc0.poll_ready()); - - // svc0 remains ready. - let _ = svc0.call(1).await.expect("must not fail!"); - - // svc1 grabs the lock and fails immediately. - let mut svc1 = svc0.clone(); - assert_ready!(svc1.poll_ready()) - .expect_err("must fail") - .downcast_ref::() - .expect("must fail with service error") - .inner() - .is::(); - - // svc0 suffers the same fate. - assert_ready!(svc0.poll_ready()) - .expect_err("mut fail") - .downcast_ref::() - .expect("must fail with service error") - .inner() - .is::(); -} - -#[tokio::test] -async fn dropping_releases_access() { - let _ = tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - use tower::util::ServiceExt; - let mut svc0 = LockService::new(Decr::new(3, Arc::new(true.into()))); - - // svc0 grabs the lock, but the inner service isn't ready. - future::poll_fn(|cx| { - assert_ready_ok!(svc0.poll_ready(cx)); - Poll::Ready(()) - }) - .await; - - let svc1 = svc0.clone(); - - let (tx1, rx1) = oneshot::channel(); - tokio::spawn( - async move { - let svc1 = svc1; - trace!("started"); - let _f = svc1.oneshot(1).instrument(info_span!("1shot")).await; - trace!("sending"); - tx1.send(()).unwrap(); - trace!("done"); - } - .instrument(info_span!("Svc1")), - ); - - let (tx2, rx2) = oneshot::channel(); - let svc2 = svc0.clone(); - tokio::spawn( - async move { - trace!("started"); - let _ = svc2.oneshot(1).await; - trace!("sending"); - tx2.send(()).unwrap(); - trace!("done"); - } - .instrument(info_span!("Svc2")), - ); - - // svc3 will be the notified waiter when svc0 completes; but it drops - // svc3 before polling the waiter. This test ensures that svc2 is - // notified by svc3's drop. - let svc3 = svc0.clone(); - let (tx3, rx3) = oneshot::channel(); - tokio::spawn( - async move { - let mut svc3 = Some(svc3); - let mut rx3 = rx3; - let _ = future::poll_fn(|cx| { - let rx3 = &mut rx3; - tokio::pin!(rx3); - trace!("Polling"); - if let Poll::Ready(ready) = rx3.poll(cx) { - trace!(?ready, "Dropping"); - drop(svc3.take()); - return Poll::Ready(Ok(())); - } - svc3.as_mut() - .expect("polled after ready?") - .poll_ready(cx) - .map_err(|_| ()) - }) - .await; - } - .instrument(info_span!("Svc3")), - ); - - tokio::spawn( - async move { - trace!("started"); - let _ = svc0.ready_and().await; - trace!("ready"); - tx3.send(()).map_err(|_| ())?; - trace!("sent"); - Ok::<(), ()>(()) - } - .instrument(info_span!("Svc0")), - ); - // svc3 notified; but it is dropped before it can be polled - - rx2.await.unwrap(); - rx1.await.unwrap(); -} - -#[tokio::test] -async fn fuzz() { - let _ = tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .try_init(); - const ITERS: usize = 100_000; - for (concurrency, iterations) in &[(1usize, ITERS), (3, ITERS), (100, ITERS)] { - async { - tracing::info!("starting"); - let svc = LockService::new(Decr::new(*iterations, Arc::new(true.into()))); - let joins = futures::stream::futures_unordered::FuturesUnordered::new(); - for i in 0..*concurrency { - let lock = svc.clone(); - let join = tokio::spawn( - async move { - let mut lock = lock; - future::poll_fn(|cx| { - loop { - let ready = lock.poll_ready(cx); - tracing::trace!(?ready); - if futures::ready!(ready).is_err() { - return Poll::Ready(()); - } - // Randomly be busy while holding the lock. - if rand::random::() { - cx.waker().wake_by_ref(); - return Poll::Pending; - } - - tokio::spawn(lock.call(1)); - } - }) - .await; - tracing::trace!("task done"); - } - .instrument(tracing::trace_span!("task", number = i)), - ); - joins.push(join); - } - - joins - .for_each(|res| { - let _ = res.expect("task must not have panicked"); - async {} - }) - .await; - tracing::info!("done"); - } - .instrument(tracing::info_span!("fuzz", concurrency, iterations)) - .await - } -} - -#[derive(Debug, Default)] -struct Decr { - value: usize, - ready: Arc, -} - -#[derive(Copy, Clone, Debug)] -struct Underflow; - -impl From for Decr { - fn from(value: usize) -> Self { - Self::new(value, Arc::new(AtomicBool::new(true))) - } -} - -impl Decr { - fn new(value: usize, ready: Arc) -> Self { - Decr { value, ready } - } -} - -impl tower::Service for Decr { - type Response = usize; - type Error = Underflow; - type Future = - Pin> + Send + Sync + 'static>>; - - fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { - let span = tracing::trace_span!("Decr::poll_ready", self.value); - let _g = span.enter(); - if self.value == 0 { - tracing::trace!(ready = true, "underflow"); - return Poll::Ready(Err(Underflow)); - } - - if !self.ready.load(Ordering::SeqCst) { - tracing::trace!(ready = false); - return Poll::Pending; - } - - tracing::trace!(ready = true); - Poll::Ready(Ok(())) - } - - fn call(&mut self, decr: usize) -> Self::Future { - if self.value < decr { - self.value = 0; - - return Box::pin(async { Err(Underflow) }); - } - - self.value -= decr; - let value = self.value; - Box::pin(async move { Ok(value) }) - } -} - -impl std::fmt::Display for Underflow { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "underflow") - } -} - -impl std::error::Error for Underflow {}