1. Rust 异步编程简介
1.1 定义和目的
Rust 的异步编程模型允许代码在等待 I/O 或其他操作时不阻塞线程,而是通过 Future 来表示将来完成的值。核心是 async
关键字,用于定义异步函数或块,返回一个 Future
。 目的:实现高效的并发 I/O,避免线程阻塞,提高吞吐量,尤其在服务器或网络应用中。 与同步代码不同,异步代码不立即执行,而是生成一个可轮询(poll)的 Future。
Rust 异步的核心组件:
- Future trait:表示异步计算,返回 Poll::Pending 或 Poll::Ready。
- async/await:语法糖,使异步代码像同步一样书写。
- 运行时:如 Tokio,提供 executor 执行 Future。
1.2 与同步编程的区别
- 同步:代码顺序执行,I/O 阻塞线程。
- 异步:代码非阻塞,等待时切换任务,提高效率。
- 线程模型:同步用多线程;异步用单线程或少线程 + event loop。
- 错误处理:异步用 Result
或 anyhow;同步用 ?。
何时选择异步? 当程序有大量 I/O 操作时,如 web server;同步适合 CPU 密集任务。
2. 基础语法和概念
2.1 Future Trait
Future 是异步计算的核心:
#![allow(unused)] fn main() { use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; struct MyFuture { value: i32, } impl Future for MyFuture { type Output = i32; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { if self.value == 0 { Poll::Pending } else { self.value -= 1; if self.value == 0 { Poll::Ready(42) } else { Poll::Pending } } } } }
poll
方法检查 Future 是否就绪。
2.2 async/await 语法
async 函数返回 Future:
async fn fetch_data() -> Result<String, Box<dyn std::error::Error>> { // 模拟 I/O Ok("data".to_string()) } #[tokio::main] async fn main() { let data = fetch_data().await.unwrap(); println!("{}", data); }
await
等待 Future 完成。
2.3 运行时:Tokio 示例
安装 Tokio:cargo add tokio --features full
use tokio::net::TcpListener; #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { let (socket, _) = listener.accept().await?; tokio::spawn(async move { // 处理 socket }); } }
- Tokio 提供 executor 和 I/O 原语。
3. Pinning 和 Unpin
异步代码可能生成 self-referential Future,需要 Pin 固定内存。
3.1 Pin 示例
#![allow(unused)] fn main() { use std::pin::Pin; use std::task::{Context, Poll}; use std::future::Future; struct Delay { remaining: u32, } impl Future for Delay { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { if self.remaining == 0 { Poll::Ready(()) } else { self.remaining -= 1; cx.waker().wake_by_ref(); Poll::Pending } } } }
Pin<&mut Self>
确保不移动。
3.2 Unpin 类型
大多数类型自动 Unpin,不需 Pin。
4. 并发原语
4.1 Channels
use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel(32); tokio::spawn(async move { tx.send("hello").await.unwrap(); }); println!("{}", rx.recv().await.unwrap()); }
- 多生产者单消费者。
4.2 Mutex 和 Arc
use std::sync::Arc; use tokio::sync::Mutex; #[tokio::main] async fn main() { let counter = Arc::new(Mutex::new(0)); let counter_clone = counter.clone(); tokio::spawn(async move { *counter_clone.lock().await += 1; }); *counter.lock().await += 1; println!("{}", *counter.lock().await); }
- 线程安全共享。
5. 错误处理
使用 anyhow 或 thiserror 处理异步错误:
use anyhow::{Result, anyhow}; async fn fetch() -> Result<String> { Err(anyhow!("Error")) } #[tokio::main] async fn main() -> Result<()> { fetch().await?; Ok(()) }
?
在 async 中传播错误。
6. Streams 和 Sinks
使用 futures 或 tokio_stream 处理流:
use tokio_stream::StreamExt; use tokio::sync::mpsc; #[tokio::main] async fn main() { let (tx, mut rx) = mpsc::channel(32); tx.send(1).await.unwrap(); tx.send(2).await.unwrap(); drop(tx); while let Some(item) = rx.recv().await { println!("{}", item); } }
- 处理异步流。
7. 高级主题
7.1 Async Traits
在 trait 中定义 async fn:
trait AsyncService { async fn handle(&self, req: String) -> String; } struct Service; impl AsyncService for Service { async fn handle(&self, req: String) -> String { format!("Handled: {}", req) } } #[tokio::main] async fn main() { let s = Service; println!("{}", s.handle("request".to_string()).await); // Handled: request }
- 自 Rust 1.75,支持 async fn in traits。
7.2 Select 和 Join
使用 tokio::select! 处理多个 Future:
use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { tokio::select! { _ = sleep(Duration::from_secs(1)) => println!("Timer 1"), _ = sleep(Duration::from_secs(2)) => println!("Timer 2"), }; }
- 等待第一个完成。
8. 用例
- Web 服务器:处理并发请求。
- I/O 操作:文件、网络非阻塞。
- 数据库查询:异步连接。
- GUI 事件:非阻塞 UI。
- 微服务:高吞吐 API。
9. 最佳实践
- 选择运行时:Tokio 适合生产;async-std 简单。
- 处理错误:用 anyhow 简化。
- 避免阻塞:用 async 原语替换 sync。
- Pinning 处理:了解 Unpin 类型。
- 测试:用 tokio::test 测试 async。
- 文档:说明 lifetime 和 Send/Sync。
10. 常见陷阱和错误
- 阻塞代码:sync I/O 在 async 中阻塞运行时;用 async 版本。
- Lifetime 错误:async 借用需 'static 或 scoped。
- Pinning 遗忘:!Unpin Future 需 Pin;处理或用 Unpin 类型。
- 运行时缺失:async fn 需 executor 如 tokio::main。
- 取消安全:async 代码需考虑取消;用 drop 处理。