作者:谢敬伟,江湖人称“刀哥”,20年IT老兵,数据通信网络专家,电信网络架构师,目前任Netwarps开发总监。刀哥在操作系统、网络编程、高并发、高吞吐、高可用性等领域有多年的实践经验,并对网络及编程等方面的新技术有浓厚的兴趣。
Send 与 Sync 可能是Rust多线程以及异步代码种最常见到的约束。在前面一篇讨论多线程的文章中介绍过这两个约束的由来。但是,真正书写比较复杂的代码时,还是会经常遇到编译器的各种不配合。这里借用我的同事遇到的一个问题再次举例谈一谈 Send 与 Sync 的故事。
基本场景
C/C++中不存在Send/Sync的概念,数据对象可以任意在多线程中访问,只不过需要程序员保证线程安全,也就是所谓“加锁”。而在Rust中,由于所有权的设计,不能直接将一个对象分成两份或多份,每个线程都放一份。一般地,如果一份数据仅仅子线程使用,我们会将数据的值转移至线程中,这也是Send的基础含义。因此,Rust代码经常会看到将数据clone(),然后move到线程中:
let b = aa.clone();
thread::spawn(move || {
b...
})
假如,数据需要在多线程共享,情况会复杂一些。我们一般不会在线程中直接使用外部环境变量引用。原因很简单,生命周期的问题。线程的闭包要求‘static,这会与被借用的外部环境变量的生命周期冲突,错误代码如下:
let bb = AA::new(8);
thread::spawn( || {
let cc = &bb; //closure may outlive the current function, but it borrows `bb`, which is owned by the current function
});
包裹一个Arc可以解决这个问题,Arc恰好就是用来管理生命周期的,改进后的代码如下:
let b = Arc::new(aa);
let b1 = b.clone();
thread::spawn(move || {
b1...
})
Arc提供了共享不可变引用的功能,也就是说,数据是只读的。如果我们需要访问多线程访问共享数据的可变引用,即读写数据,那么还需要在原始数据上先包裹Mutex<T>
,类似于RefCell<T>
,提供内部可变性,因此我们可以获取内部数据的&mut,修改数据。当然,这需要通过Mutex::lock() 来操作。
let b = Arc::new(Mutex::new(aa));
let b1 = b.clone();
thread::spawn(move || {
let b = b1.lock();
...
})
为什么不能直接使用RefCell完成这个功能?这是因为RefCell不支持 Sync,没办法装入Arc。注意Arc的约束:
unsafe impl<T: ?Sized + Sync + Send> Send for Arc<T> {}
若 Arc<T>
是Send,条件是 T:Send+Sync。RefCell不满足 Sync,因此 Arc<RefCell<>> 不满足Send,无法转移至线程中。错误代码如下:
let b = Arc::new(RefCell::new(aa));
let b1 = b.clone();
thread::spawn(move || {
^^^^^^^^^^^^^ `std::cell::RefCell<AA<T>>` cannot be shared between threads safely
let x = b1.borrow_mut();
})
异步代码:跨越 await 问题
如上所述,一般地,我们会将数据的值转移入线程,这样只需要做正确的 Send和Sync 标记即可,很直观,容易理解。典型的代码如下:
fn test1<T: Send + Sync + 'static>(t: T) {
let b = Arc::new(t);
let bb = b.clone();
thread::spawn( move|| {
let cc = &bb;
});
}
根据上面的分析,不难推导出条件 T: Send + Sync + 'static 的来龙去脉:Closure: Send + 'static ⇒ Arc<T>
: Send + ’static ⇒ T: Send + Sync + 'static。
然而,在异步协程代码中有一种常见情况,推导过程则显得比较隐蔽,值得说道说道。考察以下代码:
struct AA<T>(T);
impl<T> AA<T> {
async fn run_self(self) {}
async fn run(&self) {}
async fn run_mut(&mut self) {}
}
fn test2<T: Send + 'static>(mut aa: AA<T>) {
let ha = async_std::task::spawn(async move {
aa.run_self().await;
});
}
test2 中,限定 T: Send + ‘static,合情合理。async fn 生成的 GenFuture 要求 Send + ‘static,因此被捕获置入 GenFuture 匿名结构中的 AA 也必须满足 Send + ‘static,进而要求AA 泛型参数也满足Send + ‘static。
然而,类似的方式调用 AA::run() 方法,编译失败,编译器提示 GenFuture 不满足 Send。代码如下:
fn test2<T: Send + 'static>(mut aa: AA<T>) {
let ha = async_std::task::spawn(async move {
^^^^^^^^^^^^^^^^^^^^^^ future returned by `test2` is not `Send`
aa.run().await;
});
}
原因在于,AA::run()方法的签名是 &self,所以run()是通过 aa 的不可变借用 &AA 来调用。而run()又是一个异步方法,执行了await,也就是所谓的&aa 跨越了 await,故而要求GenFuture匿名结构除了生成aa之外,还需要生成 &aa,示意代码如下:
struct {
aa: AA
aa_ref: &AA
}
正如之前探讨过,生成的 GenFuture需要满足 Send,因此 AA 以及 &AA 都需要满足 Send。而&AA满足 Send,则意味着 AA 满足 Sync。这也就是各种 Rust教程中都会提到的那句话的真正含义:
对于任意类型 T,如果 &T是 Send ,T 就是 Sync 的
之前出错的代码修改为如下形式,增加 Sync标记,编译通过。
fn test2<T: Send + Sync + 'static>(mut aa: AA<T>) {
let ha = async_std::task::spawn(async move {
aa.run().await;
});
}
另外,值得指出的是上述代码中调用 AA::run_mut(&mut self) 不需要 Sync 标记:
fn test2<T: Send + 'static>(mut aa: AA<T>) {
let ha = async_std::task::spawn(async move {
aa.run_mut().await;
});
}
这是因为 &mut self 并不要求 T: Sync。参见以下标准库中关于Sync定义代码就明白了:
mod impls {
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: Sync + ?Sized> Send for &T {}
#[stable(feature = "rust1", since = "1.0.0")]
unsafe impl<T: Send + ?Sized> Send for &mut T {}
}
可以看到,&T: Send 要求 T: Sync,而 &mut T 则 T: Send 即可。
总结
总而言之,Send约束在根源上是由 thread::spawn() 或是 task::spawn() 引入的,因为两个方法的闭包参数必须满足 Send。此外,在需要共享数据时使用Arc<T>
会要求 T: Send + Sync。而共享可写数据,需要Arc<Mutex<T>>
,此时 T: Send 即可,不再要求Sync。
异步代码中关于 Send/Sync 与同步多线程代码没有不同。只是因为GenFuture 的特别之处使得跨越 await 的变量必须是 T: Send,此时需要注意通过 T 调用异步方法的签名,如果为 &self,则必须满足 T:Send + Sync。
最后,一点经验分享:关于 Send/Sync 的道理并不复杂,更多时候是因为代码中层次比较深,调用关系复杂,导致编译器的错误提示很难看懂,某些特定场合编译器可能还会给出完全错误的修正建议,这时候需要仔细斟酌,追根溯源,找到问题的本质,不能完全依靠编译器提示。
深圳星链网科科技有限公司(Netwarps),专注于互联网安全存储领域技术的研发与应用,是先进的安全存储基础设施提供商,主要产品有去中心化文件系统(DFS)、企业联盟链平台(EAC)、区块链操作系统(BOS)。
微信公众号:Netwarps
有疑问加站长微信联系(非本文作者)