假设我们想在异步代码中共享状态
传统的互斥锁有两个方法(这里“传统”意味着与Rust不同):lock和unlock。代码锁定mutex,执行某些操作,然后解锁互斥锁。如果其他代码尝试锁定互斥锁,则会阻塞,直到互斥锁解锁。
我们可以通过下面的假想类型来想象rust中的流程:
fn exclusive_access(mutex: &MyMutex, protected: &MyObject) {
mutex.lock();
protected.do_something();
mutex.unlock();
}
我们必须相信在访问MyObject的任何地方,都锁定了相同的mutex。
而且你可能会忘记在完成后解锁互斥锁。
更可怕的是,在 do_something()
出错时提前返回,这样永远也无法解锁互斥锁了。
在Rust中,mutex和它保护的对象是绑定在一起的,这就是 std::sync::Mutex
。
当你尝试锁定互斥锁时,你会得到一个MutexGuard。你可以通过它访问被保护的数据。
当MutexGuard超出作用范围时,互斥锁就自动解锁,这种行为叫做RAII。
这就意味着只有我们能成功锁定mutex时,才能得到一个MutexGuard。
我们用Rust标准库来实现上面代码的功能:
fn exclusive_access(mutex: &std::sync::Mutex<MyObject>) {
let guard = mutex
.lock()
.expect("the mutex is poisoned");
// MutexGuard会自动解引用,所以我们可以直接调用MyObject方法
guard.do_something();
}
我们无法在没有锁定mutex的情况下意外访问MyObject,我们也无需考虑解锁mutex。
除非持有MutexGuard的线程发生了panic,那么当MutexGuard被释放时,mutex不会自动解锁,而会标记为“poisoning”。
我们不会深入讨论mutex poisoning的情况,但是你可以在这里找到更多信息:https://doc.rust-lang.org/std/sync/struct.Mutex.html#poisoning
让我们想象两个线程访问我们保护的对象。
这里简化了一些细节。例如,我们不能直接在创建两个线程时传递一个mutex,而是用智能指针将它包装起来。不过这对本例而言不重要。
现在,让我们回到在异步上下文中使用mutex保护(这里指错误地使用它)。
我们假设有一个例子,需要在await点持有一个mutex。
Note
await点指异步函数中遇到await时发生暂停的那个点
这可能包括
- 读取共享计数器
- 访问异步共享资源
- 写入新的值到共享计数器
现在,我们用让出执行权给运行时来模拟异步共享资源。
这是我们的异步函数:
use std::sync::{Arc, Mutex};
async fn hold_mutex_guard(data: Arc<Mutex<u64>>) -> Result<(), DataAccessError> {
let mut guard = data.lock().map_err(|_| DataAccessError {})?;
println!("existing value: {}", *guard);
tokio::task::yield_now().await;
*guard = *guard + 1;
println!("new value: {}", *guard);
Ok(())
}
- 异步函数接受一个数据参数,它是一个
Arc<Mutex<u64>>
类型 Mutex<u64>
表示有一个u64
类型的数据被Mutex
保护,只能通过锁进行访问Arc
表示这是一个原子引用计数的共享指针,可以在多个任务间共享Mutex
。- 首先我们锁定
Mutex
获取MutexGuard
- 打印共享数据值
- 在await点让出执行权,模拟异步操作
- 修改共享函数的值
- 打印修改后的值
- 完成
锁定mutex锁可能会失败(mutex可能会poisoning),所以在这种情况下,我们应该返回一个错误。
我们为这种情况定义了一个新的类型:
use std::{error::Error, fmt};
#[derive(Debug)]
struct DataAccessError {}
impl fmt::Display for DataAccessError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "there was an error accessing the shared data")
}
}
impl Error for DataAccessError {}
现在,让我们来运行这段代码看看。
让我们调用我们的future,或者说await它。
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(0_u64));
hold_mutex_guard(Arc::clone(&data))
.await
.expect("failed to perform operation");
}
输出:
existing value: 0
new value: 1
请记住,它在await点上持有一个互斥锁!我们需要更努力地尝试“做坏事”。
我们不能连续两次等待我们的异步函数,因为这样它会依次运行。
然而,有一种方法可以在异步运行时并发运行多个future.它被称为spawn。在tokio中,它是 tokio::spawn
。
使用 tokio::spawn
时,你不需要 .await
。future将被设置为立即在新任务中执行。
但是,新任务可能不会立即被轮询。这取决于异步运行时工作线程的忙碌程度。
让我们创建一个简单的示例:
// `flavor`` has to be one of these values, not both. This code won't compile.
#[tokio::main(flavor = "current_thread|multi_thread")]
async fn main() {
tokio::spawn(spawn_again());
do_nothing().await;
tokio::task::yield_now().await
tokio::task::yield_now().await
// ... Let's pretend there's more here and we're not returning yet.
}
async fn spawn_again() {
tokio::spawn(do_nothing());
}
async fn do_nothing() {
// There's nothing here
}
这里我们的async main使用 spawn_again
函数生成一个任务。
然后,它等待一个名为 do_nothing
的异步函数。
异步函数 spawn_again
使用 do_nothing
生成一个任务。
让我们看看这在不同的运行时调度器下会如何工作。
一个异步运行时可能只有一个工作线程。例如,在tokio中,current thread scheduler就是这样的。然后我们可以在另一个任务内部启动给一个任务。但是,直到当前任务让出控制权给调度器,这个任务才会被轮询。
这是时序图:
注意,被启动的任务需要等待运行时空闲后才能被轮询。当一个任务等待一个future时,并且没有新任务产生,它会立即被轮询。
- spawn: 创建新任务,但可能要等待运行时轮询
- await:在当前任务中执行,立即轮询,并暂停当前任务
另一种情况是,异步运行时有多个工作线程。例如,在tokio中,multi thread scheduler就是这样的。这意味着我们可以在另一个任务内部启动一个任务,而不必等待运行时空闲。
让我们以有两个工作线程的运行时为例,看看时序图会是什么样子。注意现在有了并行,所以操作的确切顺序可能有所不同。
实际上,任务是在同一个工作线程上启动的,该工作线程正在运行生成任务的任务。如果另一个工作线程空闲,它可能会从第一个工作线程的队列中窃取任务(这部分内容超出本文讨论范围)。
启动操作返回一个连接句柄:tokio::task::JoinHandle
。我们可以使用它来等待任务完成。它也可以来中止启动的任务。
现在让我们回到尝试破坏的过程中。
让我们生成几个异步函数实例:
#[tokio::main]
async fn main() {
let data = Arc::new(Mutex::new(0_u64));
tokio::spawn(hold_mutex_guard(Arc::clone(&data)));
tokio::spawn(hold_mutex_guard(Arc::clone(&data)));
}
这会编译失败,并出现大量错误。
error: future cannot be sent between threads safely
--> resources/understanding-async-await/src/bin/mutex_guard_async.rs:5:18
|
5 | tokio::spawn(hold_mutex_guard(Arc::clone(&data)));
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ future returned by `hold_mutex_guard` is not `Send`
|
= help: within `impl Future<Output = Result<(), DataAccessError>>`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, u64>`
note: future is not `Send` as this value is used across an await
--> resources/understanding-async-await/src/bin/mutex_guard_async.rs:15:29
|
12 | let mut guard = data.lock().map_err(|_| DataAccessError {})?;
| --------- has type `std::sync::MutexGuard<'_, u64>` which is not `Send`
...
15 | tokio::task::yield_now().await;
| ^^^^^^ await occurs here, with `mut guard` maybe used later
...
21 | }
| - `mut guard` is later dropped here
note: required by a bound in `tokio::spawn`
--> /Users/stainsby/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.27.0/src/task/spawn.rs:163:21
|
163 | T: Future + Send + 'static,
| ^^^^ required by this bound in `spawn`
其实只有一个错误:Future不能安全地在线程间传递。
生成的任务可能在任何工作线程上启动。即使是当前线程的运行时,也可能有来自其他线程的任务生成。
所以future需要能够在线程间传递,但为什么不能呢?
note1:
note: future is not `Send` as this value is used across an await
它指向了mut guard,并告诉我们它不是Send,然后指向我们yield的 .await
作为有问题的await点。
note2:
note: requited by a bound in `tokio::spawn`
这个Send的东西并不是魔法,它是由tokio在 tokio::spawn
中显式指定的。
tokio::spawn
的代码:
pub fn spawn<T>(future: T) -> JoinHandle<T::Output>
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
// (we're skipping the actual implementation)
}
我们可以看到T必须满需三个条件:
- 实现Future trait
- 实现Send trait
- 拥有 'static 生命周期
我们知道不能在线程见传递数据,那为什么future不可以呢?
那么,一个类型要怎么实现Send呢?
Rust中有一些特殊的trait,它们不需要任何方法,只是用来标记类型。
这些trait被称为标记trait(marker traits)。
如果我们查看 std::marker::Send
trait,我们会发现它是unsafe的!
它遵循了rust的约定:当编译器无法判断是否安全时,用unsafe来表示我们担保其安全性。
默认情况下,如果一个结构体里所有成员都是Send的,那么它也是Send的。所以,通常我们不用关心手动标记Send。但是,我们需要警惕在哪些地方不能使用非Send类型。
我们不需要并行运行非Send的异步函数,只需要从别的地方尝试锁定mutex就行。
所以,让我们创建一个可以被生成的异步函数。它和之前的函数类似,但是没有yield(所以没有await点,也就不算真正的异步函数):
async fn yieldless_mutex_access(data: Arc<Mutex<u64>>) -> Result<(), DataAccessError> {
let mut guard = data.lock().map_err(|_| DataAccessError {})?;
println!("existing value: {}", *guard);
*guard = *guard + 1;
println!("new value: {}", *guard);
Ok(())
}
我们没有在await点上持有MutexGuard,所以它是Send的。
为了确保会出问题,我们还需要做出一个改动:使用当前线程运行时。这意味着任务不会并行运行,所以更容易制造某些情况:
#[tokio::main(flavor = "current_thread")]
async fn main() {
let data = Arc::new(Mutex::new(0_u64));
tokio::spawn(yieldless_mutex_access(Arc::clone(&data)));
hold_mutex_guard(Arc::clone(&data))
.await
.expect("failed to perform operation");
}
这里我们生成了Send的异步函数 yieldless_mutex_access
,然后await了我们的坏函数 hold_mutex_guard
。
输出:
existing value: 0
然后它就卡住了。
我们成功制造了死锁。
我们将手动实现一个执行相同操作的future。
future通常以状态机的方式实现。我们需要一个初始状态,最好还有一个明确的完成状态,中间还需要一个执行过一次yield后的状态。
我们的Future可以如下:
use std::sync::{Arc, Mutex};
enum HoldMutexGuard<'a> {
Init {
data: Arc<Mutex<u64>>,
},
Yielded {
guard: MutexGuard<'a, u64>,
_data: Arc<Mutex<u64>>,
},
Done,
}
我们把即将成为Future的东西封装在一个函数中:
fn hold_mutex_guard(
data: Arc<Mutex<u64>>,
) -> impl Future<Output = Result<(), DataAccessError>> {
HoldMutexGuard::Init { data }
}
状态机如下:
Future从Init状态开始,第一次轮询时返回 Poll::Pending
并转移到 Yielded
状态。第二次轮询时返回 Poll::Ready
并转移到 Done
状态。
然而,实现起来会稍微复杂一点。
impl<'a> Future for HoldMutexGuard<'a> {
type Output = Result<(), DataAccessError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let state = &mut *self;
match state {
Self::Init { data } => {
let guard = unsafe {
// SAFETY: We will hold on to the Arc containing the mutex as long
// as we hold onto the guard.
std::mem::transmute::<MutexGuard<'_, u64>, MutexGuard<'static, u64>>(
data.lock().map_err(|_| DataAccessError {})?,
)
};
println!("existing value: {}", *guard);
cx.waker().wake_by_ref();
*state = Self::Yielded {
guard: guard,
_data: Arc::clone(data),
};
Poll::Pending
}
Self::Yielded { guard, _data } => {
println!("new value: {}", *guard);
*state = Self::Done;
Poll::Ready(Ok(()))
}
Self::Done => panic!("Please stop polling me!"),
}
}
}
现在,我们看一下 poll()
的实现。
等等,这是什么?
let state = &mut *self;
借用检查器会对任何与Pin有关的东西进行严格检查。我们需要修改self,但它被pin了,我们也需要引用它的部分。所以我们解引用pinned的self,然后获取一个可变引用。
第一次被轮询时,我们处于Init状态,所以执行到async函数中的yield_now调用。
不幸的是,我们不能直接将MutexGuard存储到它所保护的Mutex中。这会导致循环引用结构。Rust强烈反对这种结构,我们不得不使用unsafe来实现我们想要的功能。
我们需要将MutexGuard转换为一个拥有'static生命周期的对象。我们可以使用 std::mem::transmute
来做到这一点。
一旦我们有了MutexGuard,我们就打印该值。现在我们要返回给运行时。 所以就像我们的 YieldNow
future中一样,我们需要先唤醒我们的waker。 否则我们的future将永远不会被再次轮询。 然后我们设置下一个状态:Yielded
。(使用那个有趣的 &mut *self
)并返回 Poll::Pending
。
下次对我们的future轮询时,我们已经处于Yielded状态。我们将打印 MutexGuard 中的值。然后进入 Done 状态并返回 Poll::Ready
。此时,MutexGuard 将被丢弃。至此,实现就结束了。
这里重要的一点是,在 Yielded 状态下,我们持有 MutexGuard 并返回。 这也是我们的异步函数正在做的事情。 但我们并没有看得那么清楚。 我们只是看到.await
。 但每当你的异步函数包含一个await点时,那就是future潜在的返回。 在返回之前,它必须将所有范围内的局部变量存储在自身中。
让我们用我们的Future再次重现那个挂起的程序吧。
我们将生成相同的异步函数yieldless_mutex_access
来帮助引发挂起(实际上不做任何异步事情),并直接展开 async main()
:
fn main() {
let body = async {
let data = Arc::new(Mutex::new(0_u64));
tokio::spawn(yieldless_mutex_access(Arc::clone(&data)));
hold_mutex_guard(Arc::clone(&data))
.await
.expect("failed to perform operation");
};
return tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build runtime")
.block_on(body);
}
我们正在创建一个当前线程运行时,和之前一样。这使得触发悬挂行为更容易。
我们来看看时序图吧!
重要的一点是两个Future。
首先生成 yieldless_mutex_access()
。
然后等待 HoldMutexGuard
。
正如我们在引入spawn时所看到的,新任务必须等待。运行时是单线程的,因此,使用 yieldless_mutex_access()
创建的新任务必须等待,直到当前任务向运行时转移执行权。
这意味着 HoldMutexGuard
future 首先运行。它锁定互斥体并接收 MutexGuard。它唤醒了它的waker(所以返回 Poll::Pending
后会再次进行轮询),然后将状态更改为 Yielded
,将 MutexGuard
存储在其自身中,然后返回 Poll::Pending,向运行时转移执行权。
现在运行时可以轮询下一个任务,即由 yieldless_mutex_access()
产生的。此任务禅师锁定mutex,但mutex已经被锁定,因此它会阻塞,直到解锁为止。由于运行时只有一个线程,这会阻塞整个运行时,并造成死锁。
现在我们明白为什么了!
持有MutexGuard在await点上是安全的,因为它的 lock()
方法是异步的,在等待解锁时不会阻塞线程,这样其他持有锁的任务就可以继续执行。
然而,最好的方式是不使用mutex。应该将共享资源的完整所有权交给单个任务,并通过消息传递与该任务进行通信。
在第四部分,我们将研究消息传递和通道。