Skip to content

异步 Asynchronous

异步解决的是并发问题,而不是并行,那是多线程的活儿。

  • 异步 (Async)解决高并发 I/O 密集型问题

    • 核心:用少量 OS 线程处理大量任务 (如网络请求、文件读写)。
    • 机制: 当任务(future类型,如网络请求)等待 I/O 时,运行时将其挂起,线程转去执行其他就绪任务。由运行时自动调度任务的恢复(协作式多任务)。
    • Rust 实现: async/await + 异步运行时 (如 tokio, async-std)。

    多线程 (Multithreading)解决 CPU 密集型并行计算问题

    • 核心:利用多核 CPU 同时执行计算 (如图像处理、数值模拟)。
    • 机制: OS 直接管理多个线程,在 CPU 核心上真正并行运行 (抢占式多任务)。
    • Rust 实现: std::thread 创建线程 + 同步原语 (Mutex, Arc 等) 管理共享数据。

核心元素

  • Future(未来量):一个可能现在还未准备好(就绪),但将来会准备好(就绪)的值
    • 在其他语言中也称为 task 或 promise.
    • 在 Rust 中,Future 是实现了 Future trait 的类型
  • async 关键字:用于代码块或函数,来表示可被中断和恢复
    • 将函数或代码块转换为返回 Future 的形式(async 整体就是一个 future 类型)
    • await 关键字: 用于等待 Future 就绪
    • 提供暂停和恢复执行的点
    • 通过**“轮询”(polling) **检查 Future 值是否可用
rust
use trpl::{Either, Html};

fn main() {
    // 从命令行获取参数,args[0] 是程序名,后面是用户输入的内容
    let args: Vec<String> = std::env::args().collect();

    // trpl::run 接受一个异步块(future),负责启动异步运行时并执行其中的代码
    // 这里我们把主要逻辑放到一个 async 块里,然后通过 run 来运行
    trpl::run(async {
        // 创建两个 future,不会立刻发送请求。只是表示“将来会做某事”的值。
        // page_title 返回一个 future,当我们 await 它时才会真正开始请求和处理。
        let title_fut_1 = page_title(&args[1]);
        let title_fut_2 = page_title(&args[2]);

        // trpl::race 接受两个 future,返回最快完成的那个结果。
        // 这里 .await 会挂起当前异步任务,直到其中一个 future 完成。
        // race 返回 Either::Left 或 Either::Right,用来区分是第一个 future 先完成还是第二个先完成。
        let (url, maybe_title) =
            match trpl::race(title_fut_1, title_fut_2).await {
                // 如果左边的 future 先完成,就进入这里
                Either::Left(left) => left,
                // 如果右边的 future 先完成,就进入这里
                Either::Right(right) => right,
            };

        // 打印哪个 URL 最先返回
        println!("{url} returned first");
        // 对解析到的标题做进一步处理
        match maybe_title {
            Some(title) => println!("Its page title is: '{title}'"),
            None => println!("Its title could not be parsed."),
        }
    })
}

// 这是一个异步函数,返回类型是 Future,实际类型是 impl Future<Output = (&str, Option<String>)>
// page_title 函数表示:给定一个 URL,异步地去获取页面文本,然后尝试解析<title>标签
async fn page_title(url: &str) -> (&str, Option<String>) {
    // trpl::get(url).await 表示发送 HTTP 请求,.await 表示要等网络请求完成
    // text().await 表示等到响应体内容被读取完,得到一个 String
    let text = trpl::get(url).await.text().await;

    // 解析 HTML:Html::parse(&text) 生成一个文档对象
    // select_first("title") 找到第一个<title>元素,如果有就返回元素对象,否则返回 Err
    // map 把找到的元素转换成 inner_html(实际标题内容)
    let title = Html::parse(&text)
        .select_first("title")
        .map(|title| title.inner_html());

    // 返回一个元组:原始 URL(&str),以及可能存在的标题(Option<String>)
    (url, title)
}

补充理解:

  • 在 async 块里,当遇到 .await 时,异步运行时会收回控制权去执行另一个 async 块(因为等待 I/O 响应太费时间了,先去干别的,让cpu不“空转”)
  • 在同一个 async 块中,代码是顺序执行的(就像单线程那样),只有等 .await 代码有结果了才会继续往下执行(并非遇到 .await 就跳过去了)
  • 异步运行时在切换 async 块时会检查 .await 代码是否已完成。若已完成,则会从该代码块的暂停处(也就是.await)继续执行

把 Async 用于并发的小细节

对应文档:使用 Async 应用并发

(略)

处理多个 Futures / 多 Async 注意事项

  • 在同一个阻塞块(用于存放多个异步任务)中,若一个 async 块中不存在 .await 语句
    • 会导致异步运行时无法介入,从而使其他 Async 块被饿死(不会被执行/执行不完全)
  • 可时不时插入 trpl::yield now().await;(这一句仅适用于 trpl 提供的运行时)语句让出控制权

流(Streams)

Rust的流(Stream)就像一条懒洋洋的传送带,它慢悠悠地吐着未来值(Future),你得用.await像钩子一样把值拽下来,它天生就是异步的,不像迭代器那样急吼吼地当场算完,而是像会摸鱼的打工人,不到最后一刻绝不干活,必须搭配StreamExt工具箱里的map/filter/fold这些扳手才能驯服,本质上就是个披着异步外衣的迭代器,专治各种IO密集型的慢性子任务。

  • 流(个人理解) = 异步版迭代器 + 消息传递机制
    • 流其实是一个数据流整体,流也有发送端和接收端(就像线程之间的消息传递那样),数据在发送后、被接收前需要排队
    • 流的上游发送端是一个惰性的迭代器,只有被下游催促才会发出数据
    • 在异步运行时切换到下游任务之前,数据会排队等候
  • 就像线程之间传递那样,流是在同一个异步运行时中的异步任务(async 块)之间进行传递
  • 不同的是,迭代器是主动提交,而且是一口气交完。而流,下游需要提交申请 .next(),而且在 .await .next().await() 后,运行时才会切过去发送、执行到上游的.await时再切回来接收,再再继续执行下游的后续代码
rust
// 官方文档里的代码太乱了,以下代码使用AI分析页面、合并整理,可能会出现顺序错误等问题

// 导入必要的模块和trait
use std::{pin::pin, time::Duration};
use trpl::{channel, run, sleep, spawn_task, ReceiverStream, Stream, StreamExt};
// 引入必要的trait和类型
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    // 使用trpl提供的异步运行时运行程序
    trpl::run(async {
        // 创建带超时的消息流(200毫秒超时)
        let messages = get_messages().timeout(Duration::from_millis(200));
        
        // 创建间隔流,转换为字符串,限流(100毫秒),设置10秒超时
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
            .throttle(Duration::from_millis(100))
            .timeout(Duration::from_secs(10));
        
        // 合并两个流
        let merged = messages.merge(intervals).take(20);
        
        // Pin住流以便安全轮询
        let mut stream = pin!(merged);

        // 处理合并流中的每个项
        while let Some(result) = stream.next().await {
            match result {
                Ok(item) => println!("{item}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    });
}

/// 创建消息流(模拟异步消息到达)
fn get_messages() -> impl Stream<Item = String> {
    // 创建无界信道(发送端和接收端)
    let (tx, rx) = trpl::channel();

    // 创建异步任务发送消息
    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        
        // 遍历消息并添加随机延迟
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            // 发送消息并处理可能的错误
            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");
                break;
            }
        }
    });

    // 将接收端转换为流
    ReceiverStream::new(rx)
}

/// 创建间隔计数器流(每毫秒计数)
fn get_intervals() -> impl Stream<Item = u32> {
    // 创建无界信道
    let (tx, rx) = trpl::channel();

    // 创建异步任务发送计数
    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            
            // 发送计数并处理可能的错误
            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            }
        }
    });

    // 将接收端转换为流
    ReceiverStream::new(rx)
}

真实场景(瞎猜的)

上游等待网络文件下载或硬盘读取 - 异步运行时切换任务 - 下游请求数据 - 异步运行时切换任务 - 上游发送了手里拿到的部分文件数据 - (如此循环)

rust
// 以下代码由 deepseek-r1 兴致勃勃地实现(不确保可以运行)

use std::{pin::Pin, time::Duration};
use trpl::{Stream, StreamExt};

// 模拟从网络下载大文件的函数
async fn download_large_file() -> impl Stream<Item = Vec<u8>> {
    // 创建异步信道(流的实现基础)
    let (tx, rx) = trpl::channel();
    
    // 启动独立任务处理下载
    trpl::spawn_task(async move {
        println!("[生产者] 开始下载文件...");
        
        // 模拟大文件分块下载(假设文件有5个分块)
        for chunk_index in 1..=5 {
            // 模拟网络延迟(每次下载需要200ms)
            trpl::sleep(Duration::from_millis(200)).await;
            
            // 模拟获取数据块
            let chunk = vec![0u8; 1024 * 1024]; // 1MB数据块
            
            println!("[生产者] 已下载分块 {} ({}字节)", chunk_index, chunk.len());
            
            // 将数据块发送到下游
            if tx.send(chunk).is_err() {
                println!("[生产者] 下游已停止接收,提前终止下载");
                break;
            }
        }
        
        println!("[生产者] 文件下载完成");
    });
    
    // 将接收端转换为流
    ReceiverStream::new(rx)
}

#[tokio::main]
async fn main() {
    println!("[主任务] 启动文件处理程序");
    
    // 创建文件下载流
    let mut download_stream = download_large_file().await;
    
    // 处理下载的数据块
    let mut processed_bytes = 0;
    while let Some(chunk) = download_stream.next().await {
        /*
        此时运行时切换:
        1. 下游(主任务)调用 next().await 请求数据
        2. 运行时发现数据尚未就绪,挂起主任务
        3. 运行时切换到下载任务执行
        4. 下载任务完成当前分块后发送数据
        5. 运行时唤醒主任务继续执行
        */
        
        println!("[消费者] 接收到数据块 ({}字节)", chunk.len());
        
        // 模拟数据处理(需要50ms)
        trpl::sleep(Duration::from_millis(50)).await;
        
        processed_bytes += chunk.len();
        println!("[消费者] 已处理数据块,累计处理: {} MB", processed_bytes / (1024*1024));
    }
    
    println!("[主任务] 文件处理完成");
}

Async 相关的 Traits

没搞懂,下面的笔记是AI总结的,来自官方文档。

Future Trait

表示一个异步计算,会在未来某个时刻完成并返回结果:

rust
use std::pin::Pin;
use std::task::{Context, Poll};

// 定义 Future trait
pub trait Future {
    type Output;  // 异步计算完成后返回的类型

    // 检查异步操作是否完成
    // self: Pin<&mut Self> - 确保 Future 不会被意外移动
    // cx: &mut Context - 提供唤醒机制
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

// 示例:实现一个简单的 Future
struct ImmediateFuture(i32);

impl Future for ImmediateFuture {
    type Output = i32;
    
    fn poll(self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
        // 立即返回就绪状态和结果值
        Poll::Ready(self.get_mut().0)
    }
}

// 使用示例
async fn use_future() {
    let fut = ImmediateFuture(42);
    let result = fut.await;  // await 会调用 poll 方法
    println!("Result: {}", result);  // 输出: Result: 42
}

PinUnpin

用于处理自引用结构的内存安全问题:

rust
use std::pin::Pin;

// Unpin 标记 trait - 表示类型可以安全移动
struct SafeToMove(i32);
impl Unpin for SafeToMove {}  // 自动实现

// !Unpin 类型 - 不能安全移动(编译器自动为异步块生成)
struct MySelfReferential {
    data: i32,
    self_ref: *const i32,  // 指向自身的指针
}

impl !Unpin for MySelfReferential {}  // 明确标记为不可移动

// 使用示例
fn pin_example() {
    // 创建自引用结构
    let mut data = MySelfReferential {
        data: 42,
        self_ref: std::ptr::null(),
    };
    data.self_ref = &data.data as *const i32;

    // 固定到堆上防止移动
    let pinned = Box::pin(data);
    
    // 访问固定数据
    let mut_ref: Pin<&mut MySelfReferential> = pinned.as_mut();
    unsafe {
        println!("Data: {}, Self-ref: {}", 
            mut_ref.data, 
            *mut_ref.self_ref);  // 安全访问自引用数据
    }
}

Stream Trait

表示异步值序列(异步迭代器):

rust
use std::pin::Pin;
use std::task::{Context, Poll};

// 定义 Stream trait
trait Stream {
    type Item;  // 流产生的值类型
    
    // 尝试获取下一个值
    fn poll_next(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>
    ) -> Poll<Option<Self::Item>>;
}

// 示例:实现计数器流
struct CounterStream {
    count: i32,
    max: i32,
}

impl Stream for CounterStream {
    type Item = i32;
    
    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
        if self.count < self.max {
            let current = self.count;
            self.count += 1;
            Poll::Ready(Some(current))  // 返回下一个值
        } else {
            Poll::Ready(None)  // 流结束
        }
    }
}

StreamExt Trait

Stream 提供实用方法:

rust
// 扩展 trait 自动为所有 Stream 实现
trait StreamExt: Stream {
    // 获取下一个值的异步方法
    async fn next(&mut self) -> Option<Self::Item>
    where
        Self: Unpin;
}

// 使用示例(结合 trpl crate)
async fn use_stream() {
    use trpl::StreamExt;
    
    // 创建接收者流
    let (tx, rx) = trpl::channel();
    let mut rx_stream = trpl::ReceiverStream::new(rx);
    
    // 发送数据
    tx.send(10).unwrap();
    tx.send(20).unwrap();
    
    // 使用 next 方法获取值
    while let Some(value) = rx_stream.next().await {
        println!("Received: {}", value);
    }
    // 输出:
    // Received: 10
    // Received: 20
}

综合使用示例

rust
use trpl::{sleep, spawn_task, join, StreamExt};
use std::time::Duration;

async fn async_main() {
    // 创建异步任务
    let task1 = spawn_task(async {
        sleep(Duration::from_secs(1)).await;
        "Task 1 completed"
    });
    
    let task2 = spawn_task(async {
        sleep(Duration::from_secs(2)).await;
        "Task 2 completed"
    });
    
    // 等待多个任务完成
    let (result1, result2) = join!(task1, task2).await;
    
    println!("{}", result1.unwrap());  // Task 1 completed
    println!("{}", result2.unwrap());  // Task 2 completed
    
    // 使用间隔流
    let mut interval = trpl::interval(Duration::from_secs(1));
    for _ in 0..3 {
        interval.next().await;  // 每秒触发一次
        println!("Tick!");
    }
}

结合使用 future、任务和线程