异步 Asynchronous
异步解决的是并发问题,而不是并行,那是多线程的活儿。
异步 (Async) ✅ 解决高并发 I/O 密集型问题
- 核心:用少量 OS 线程处理大量任务 (如网络请求、文件读写)。
- 机制: 当任务(future类型,如网络请求)等待 I/O 时,运行时将其挂起,线程转去执行其他就绪任务。由运行时自动调度任务的恢复(协作式多任务)。
- Rust 实现:
async/await+ 异步运行时 (如tokio,async-std)。
多线程 (Multithreading) ✅ 解决 CPU 密集型并行计算问题
- 核心:利用多核 CPU 同时执行计算 (如图像处理、数值模拟)。
- 机制: OS 直接管理多个线程,在 CPU 核心上真正并行运行 (抢占式多任务)。
- Rust 实现:
std::thread创建线程 + 同步原语 (Mutex,Arc等) 管理共享数据。
核心元素
- Future(未来量):一个可能现在还未准备好(就绪),但将来会准备好(就绪)的值
- 在其他语言中也称为 task 或 promise.
- 在 Rust 中,Future 是实现了 Future trait 的类型
- async 关键字:用于代码块或函数,来表示可被中断和恢复
- 将函数或代码块转换为返回 Future 的形式(async 整体就是一个 future 类型)
- await 关键字: 用于等待 Future 就绪
- 提供暂停和恢复执行的点
- 通过**“轮询”(polling) **检查 Future 值是否可用
rust
use trpl::{Either, Html};
fn main() {
// 从命令行获取参数,args[0] 是程序名,后面是用户输入的内容
let args: Vec<String> = std::env::args().collect();
// trpl::run 接受一个异步块(future),负责启动异步运行时并执行其中的代码
// 这里我们把主要逻辑放到一个 async 块里,然后通过 run 来运行
trpl::run(async {
// 创建两个 future,不会立刻发送请求。只是表示“将来会做某事”的值。
// page_title 返回一个 future,当我们 await 它时才会真正开始请求和处理。
let title_fut_1 = page_title(&args[1]);
let title_fut_2 = page_title(&args[2]);
// trpl::race 接受两个 future,返回最快完成的那个结果。
// 这里 .await 会挂起当前异步任务,直到其中一个 future 完成。
// race 返回 Either::Left 或 Either::Right,用来区分是第一个 future 先完成还是第二个先完成。
let (url, maybe_title) =
match trpl::race(title_fut_1, title_fut_2).await {
// 如果左边的 future 先完成,就进入这里
Either::Left(left) => left,
// 如果右边的 future 先完成,就进入这里
Either::Right(right) => right,
};
// 打印哪个 URL 最先返回
println!("{url} returned first");
// 对解析到的标题做进一步处理
match maybe_title {
Some(title) => println!("Its page title is: '{title}'"),
None => println!("Its title could not be parsed."),
}
})
}
// 这是一个异步函数,返回类型是 Future,实际类型是 impl Future<Output = (&str, Option<String>)>
// page_title 函数表示:给定一个 URL,异步地去获取页面文本,然后尝试解析<title>标签
async fn page_title(url: &str) -> (&str, Option<String>) {
// trpl::get(url).await 表示发送 HTTP 请求,.await 表示要等网络请求完成
// text().await 表示等到响应体内容被读取完,得到一个 String
let text = trpl::get(url).await.text().await;
// 解析 HTML:Html::parse(&text) 生成一个文档对象
// select_first("title") 找到第一个<title>元素,如果有就返回元素对象,否则返回 Err
// map 把找到的元素转换成 inner_html(实际标题内容)
let title = Html::parse(&text)
.select_first("title")
.map(|title| title.inner_html());
// 返回一个元组:原始 URL(&str),以及可能存在的标题(Option<String>)
(url, title)
}补充理解:
- 在 async 块里,当遇到 .await 时,异步运行时会收回控制权去执行另一个 async 块(因为等待 I/O 响应太费时间了,先去干别的,让cpu不“空转”)
- 在同一个 async 块中,代码是顺序执行的(就像单线程那样),只有等 .await 代码有结果了才会继续往下执行(并非遇到 .await 就跳过去了)
- 异步运行时在切换 async 块时会检查 .await 代码是否已完成。若已完成,则会从该代码块的暂停处(也就是.await)继续执行
把 Async 用于并发的小细节
对应文档:使用 Async 应用并发
(略)
处理多个 Futures / 多 Async 注意事项
- 在同一个阻塞块(用于存放多个异步任务)中,若一个 async 块中不存在 .await 语句
- 会导致异步运行时无法介入,从而使其他 Async 块被饿死(不会被执行/执行不完全)
- 可时不时插入
trpl::yield now().await;(这一句仅适用于 trpl 提供的运行时)语句让出控制权
流(Streams)
Rust的流(Stream)就像一条懒洋洋的传送带,它慢悠悠地吐着未来值(Future),你得用
.await像钩子一样把值拽下来,它天生就是异步的,不像迭代器那样急吼吼地当场算完,而是像会摸鱼的打工人,不到最后一刻绝不干活,必须搭配StreamExt工具箱里的map/filter/fold这些扳手才能驯服,本质上就是个披着异步外衣的迭代器,专治各种IO密集型的慢性子任务。
- 流(个人理解) = 异步版迭代器 + 消息传递机制
- 流其实是一个数据流整体,流也有发送端和接收端(就像线程之间的消息传递那样),数据在发送后、被接收前需要排队
- 流的上游发送端是一个惰性的迭代器,只有被下游催促才会发出数据
- 在异步运行时切换到下游任务之前,数据会排队等候
- 就像线程之间传递那样,流是在同一个异步运行时中的异步任务(async 块)之间进行传递
- 不同的是,迭代器是主动提交,而且是一口气交完。而流,下游需要提交申请
.next(),而且在 .await.next().await()后,运行时才会切过去发送、执行到上游的.await时再切回来接收,再再继续执行下游的后续代码
rust
// 官方文档里的代码太乱了,以下代码使用AI分析页面、合并整理,可能会出现顺序错误等问题
// 导入必要的模块和trait
use std::{pin::pin, time::Duration};
use trpl::{channel, run, sleep, spawn_task, ReceiverStream, Stream, StreamExt};
// 引入必要的trait和类型
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
// 使用trpl提供的异步运行时运行程序
trpl::run(async {
// 创建带超时的消息流(200毫秒超时)
let messages = get_messages().timeout(Duration::from_millis(200));
// 创建间隔流,转换为字符串,限流(100毫秒),设置10秒超时
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.throttle(Duration::from_millis(100))
.timeout(Duration::from_secs(10));
// 合并两个流
let merged = messages.merge(intervals).take(20);
// Pin住流以便安全轮询
let mut stream = pin!(merged);
// 处理合并流中的每个项
while let Some(result) = stream.next().await {
match result {
Ok(item) => println!("{item}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
});
}
/// 创建消息流(模拟异步消息到达)
fn get_messages() -> impl Stream<Item = String> {
// 创建无界信道(发送端和接收端)
let (tx, rx) = trpl::channel();
// 创建异步任务发送消息
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
// 遍历消息并添加随机延迟
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
// 发送消息并处理可能的错误
if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
eprintln!("Cannot send message '{message}': {send_error}");
break;
}
}
});
// 将接收端转换为流
ReceiverStream::new(rx)
}
/// 创建间隔计数器流(每毫秒计数)
fn get_intervals() -> impl Stream<Item = u32> {
// 创建无界信道
let (tx, rx) = trpl::channel();
// 创建异步任务发送计数
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
// 发送计数并处理可能的错误
if let Err(send_error) = tx.send(count) {
eprintln!("Could not send interval {count}: {send_error}");
break;
}
}
});
// 将接收端转换为流
ReceiverStream::new(rx)
}真实场景(瞎猜的)
上游等待网络文件下载或硬盘读取 - 异步运行时切换任务 - 下游请求数据 - 异步运行时切换任务 - 上游发送了手里拿到的部分文件数据 - (如此循环)
rust
// 以下代码由 deepseek-r1 兴致勃勃地实现(不确保可以运行)
use std::{pin::Pin, time::Duration};
use trpl::{Stream, StreamExt};
// 模拟从网络下载大文件的函数
async fn download_large_file() -> impl Stream<Item = Vec<u8>> {
// 创建异步信道(流的实现基础)
let (tx, rx) = trpl::channel();
// 启动独立任务处理下载
trpl::spawn_task(async move {
println!("[生产者] 开始下载文件...");
// 模拟大文件分块下载(假设文件有5个分块)
for chunk_index in 1..=5 {
// 模拟网络延迟(每次下载需要200ms)
trpl::sleep(Duration::from_millis(200)).await;
// 模拟获取数据块
let chunk = vec![0u8; 1024 * 1024]; // 1MB数据块
println!("[生产者] 已下载分块 {} ({}字节)", chunk_index, chunk.len());
// 将数据块发送到下游
if tx.send(chunk).is_err() {
println!("[生产者] 下游已停止接收,提前终止下载");
break;
}
}
println!("[生产者] 文件下载完成");
});
// 将接收端转换为流
ReceiverStream::new(rx)
}
#[tokio::main]
async fn main() {
println!("[主任务] 启动文件处理程序");
// 创建文件下载流
let mut download_stream = download_large_file().await;
// 处理下载的数据块
let mut processed_bytes = 0;
while let Some(chunk) = download_stream.next().await {
/*
此时运行时切换:
1. 下游(主任务)调用 next().await 请求数据
2. 运行时发现数据尚未就绪,挂起主任务
3. 运行时切换到下载任务执行
4. 下载任务完成当前分块后发送数据
5. 运行时唤醒主任务继续执行
*/
println!("[消费者] 接收到数据块 ({}字节)", chunk.len());
// 模拟数据处理(需要50ms)
trpl::sleep(Duration::from_millis(50)).await;
processed_bytes += chunk.len();
println!("[消费者] 已处理数据块,累计处理: {} MB", processed_bytes / (1024*1024));
}
println!("[主任务] 文件处理完成");
}Async 相关的 Traits
没搞懂,下面的笔记是AI总结的,来自官方文档。
Future Trait
表示一个异步计算,会在未来某个时刻完成并返回结果:
rust
use std::pin::Pin;
use std::task::{Context, Poll};
// 定义 Future trait
pub trait Future {
type Output; // 异步计算完成后返回的类型
// 检查异步操作是否完成
// self: Pin<&mut Self> - 确保 Future 不会被意外移动
// cx: &mut Context - 提供唤醒机制
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
// 示例:实现一个简单的 Future
struct ImmediateFuture(i32);
impl Future for ImmediateFuture {
type Output = i32;
fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
// 立即返回就绪状态和结果值
Poll::Ready(self.get_mut().0)
}
}
// 使用示例
async fn use_future() {
let fut = ImmediateFuture(42);
let result = fut.await; // await 会调用 poll 方法
println!("Result: {}", result); // 输出: Result: 42
}Pin 和 Unpin
用于处理自引用结构的内存安全问题:
rust
use std::pin::Pin;
// Unpin 标记 trait - 表示类型可以安全移动
struct SafeToMove(i32);
impl Unpin for SafeToMove {} // 自动实现
// !Unpin 类型 - 不能安全移动(编译器自动为异步块生成)
struct MySelfReferential {
data: i32,
self_ref: *const i32, // 指向自身的指针
}
impl !Unpin for MySelfReferential {} // 明确标记为不可移动
// 使用示例
fn pin_example() {
// 创建自引用结构
let mut data = MySelfReferential {
data: 42,
self_ref: std::ptr::null(),
};
data.self_ref = &data.data as *const i32;
// 固定到堆上防止移动
let pinned = Box::pin(data);
// 访问固定数据
let mut_ref: Pin<&mut MySelfReferential> = pinned.as_mut();
unsafe {
println!("Data: {}, Self-ref: {}",
mut_ref.data,
*mut_ref.self_ref); // 安全访问自引用数据
}
}Stream Trait
表示异步值序列(异步迭代器):
rust
use std::pin::Pin;
use std::task::{Context, Poll};
// 定义 Stream trait
trait Stream {
type Item; // 流产生的值类型
// 尝试获取下一个值
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>>;
}
// 示例:实现计数器流
struct CounterStream {
count: i32,
max: i32,
}
impl Stream for CounterStream {
type Item = i32;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
if self.count < self.max {
let current = self.count;
self.count += 1;
Poll::Ready(Some(current)) // 返回下一个值
} else {
Poll::Ready(None) // 流结束
}
}
}StreamExt Trait
为 Stream 提供实用方法:
rust
// 扩展 trait 自动为所有 Stream 实现
trait StreamExt: Stream {
// 获取下一个值的异步方法
async fn next(&mut self) -> Option<Self::Item>
where
Self: Unpin;
}
// 使用示例(结合 trpl crate)
async fn use_stream() {
use trpl::StreamExt;
// 创建接收者流
let (tx, rx) = trpl::channel();
let mut rx_stream = trpl::ReceiverStream::new(rx);
// 发送数据
tx.send(10).unwrap();
tx.send(20).unwrap();
// 使用 next 方法获取值
while let Some(value) = rx_stream.next().await {
println!("Received: {}", value);
}
// 输出:
// Received: 10
// Received: 20
}综合使用示例
rust
use trpl::{sleep, spawn_task, join, StreamExt};
use std::time::Duration;
async fn async_main() {
// 创建异步任务
let task1 = spawn_task(async {
sleep(Duration::from_secs(1)).await;
"Task 1 completed"
});
let task2 = spawn_task(async {
sleep(Duration::from_secs(2)).await;
"Task 2 completed"
});
// 等待多个任务完成
let (result1, result2) = join!(task1, task2).await;
println!("{}", result1.unwrap()); // Task 1 completed
println!("{}", result2.unwrap()); // Task 2 completed
// 使用间隔流
let mut interval = trpl::interval(Duration::from_secs(1));
for _ in 0..3 {
interval.next().await; // 每秒触发一次
println!("Tick!");
}
}结合使用 future、任务和线程
略