Skip to content

futures

https://github.com/rust-lang/futures-rs

https://docs.rs/futures/0.3.4/futures/index.html

Rust异步浅谈

如果你像我一样只是一个一般水平的程序员,就会发现,futures的很多概念非常的难理解。
虽然有官方文档可以参考,但是官方文档写的非常的简略,并不适合练习。

Future

1
2
3
4
pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
}

Future 是可以产生异步计算的值(尽管该值可以是空的,例如 ()

poll方法

Attempt to resolve the future to a final value, registering the current task for wakeup if the value is not yet available.

返回值:

  • Poll::Pending if the future is not ready yet
  • Poll::Ready(val) with the result val of this future if it finished successfully.

Once a future has finished, clients should not poll it again.

When a future is not ready yet, poll returns Poll::Pending and stores a clone of the Waker copied from the current Context.
This Waker is then woken once the future can make progress.
For example, a future waiting for a socket to become readable would call .clone() on the Waker and store it.
When a signal arrives elsewhere indicating that the socket is readable, Waker::wake is called and the socket future's task is awoken.
Once a task has been woken up, it should attempt to poll the future again, which may or may not produce a final value.

Note that on multiple calls to poll, only the Waker from the Context passed to the most recent call should be scheduled to receive a wakeup.

join

对多个futures同时进行轮询, 一旦有future完成,就返回相关结果。
等待所有 futrue 完成才结束

Cargo.toml:

1
2
3
4
[dependencies]
async-std = "*"
futures = "*"
chrono = "*"

src/main.rs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
use async_std::task;
use std::time::Duration;
use chrono::Local;

async fn hello_world1() {
    task::sleep(Duration::from_secs(2)).await;
    println!("hello world1!, {}", Local::now());
}

async fn hello_world2() {
    task::sleep(Duration::from_secs(3)).await;
    println!("hello world2!, {}", Local::now());
}

async fn say_hello() {
    futures::join!(hello_world1(), hello_world2());
}

fn main() {
    println!("start time = {}", Local::now());
    task::block_on(say_hello());
    println!("end time = {}", Local::now());
}
// start time = 2020-04-12 16:58:18.258229 +08:00
// hello world1!, 2020-04-12 16:58:20.264633 +08:00
// hello world2!, 2020-04-12 16:58:21.264097 +08:00
// end time = 2020-04-12 16:58:21.264164 +08:00

ready

创建一个已经完成的 future,并赋值

1
2
async-std = "*"
futures = "*"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
use futures::future;
use async_std::task;

async fn run() {
    let a = future::ready(1);
    let res = a.await;
    println!("{}", res);
}

fn main() {
    task::block_on(run());
}
// 1

futures::select

等待一个 Future 完成

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
use async_std::task;
use futures::future;
use futures::select;

async fn run() {
    let mut a = future::ready(4);
    let mut b = future::pending::<()>();

    let res = select! {
        a_res = a => a_res + 1,
        _ = b => 0,
    };
    println!("res = {}", res);
}

fn main() {
    task::block_on(run());
}
// res = 5
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use async_std::task;
use std::time::Duration;
use chrono::Local;
use futures::select;
use futures::future::FutureExt;

async fn hello_world1() {
    task::sleep(Duration::from_secs(2)).await;
    println!("hello world1!, {}", Local::now());
}

async fn hello_world2() {
    task::sleep(Duration::from_secs(3)).await;
    println!("hello world2!, {}", Local::now());
}

async fn say_hello() {
    let res = select! {
        res = hello_world1().fuse() => {
            println!("hello world1");
            1
        },
        res2 = hello_world2().fuse() => { 
            println!("hello world2");
            2
        },
    };
    println!("{}", res);
}

fn main() {
    println!("start time = {}", Local::now());
    task::block_on(say_hello());
    println!("end time = {}", Local::now());
}
// start time = 2020-04-12 17:11:27.358128 +08:00
// hello world1!, 2020-04-12 17:11:29.364635 +08:00
// hello world1
// 1
// end time = 2020-04-12 17:11:29.364711 +08:00

Pin 与 UnPin

Pin<T>实际上是一个被定义于std::pin模块中的智能指针。
他是在 Rust 2018 版本中新增的语法,经过多次迭代之后,在 Rust 1.30 版本中定型为 Pin<T>

Pin<T>实际上是一个包装了指针类型的结构体,其中指针类型是指实现了Deref的类型

顾名思义,Pin有“钉”之意。在 Rust 中,使用Pin<T>则代表将数据的内存位置牢牢“钉”在原地,不让它移动。
Unpin则正好和Pin相对应,代表被“钉”住的数据,可以安全地移动。
大多数类型都自动实现了Unpin

If a similar async function is called outside of select to produce a Future, the Future must be pinned in order to be able to pass it to select.
This can be achieved via Box::pin for pinning a Future on the heap or the pin_mut! macro for pinning a Future on the stack.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
use async_std::task;
use std::time::Duration;
use chrono::Local;
use futures::select;
use futures::future::FutureExt;
use futures::pin_mut;

async fn hello_world1() {
    task::sleep(Duration::from_secs(2)).await;
    println!("hello world1!, {}", Local::now());
}

async fn hello_world2() {
    task::sleep(Duration::from_secs(3)).await;
    println!("hello world2!, {}", Local::now());
}

async fn say_hello() {
    let fut_1 = hello_world1().fuse();
    let fut_2 = hello_world2().fuse();
    pin_mut!(fut_1, fut_2); // Pins the Future on the stack
    let res = select! {
        res = fut_1 => {
            println!("hello world1");
            1
        },
        res2 = fut_2 => { 
            println!("hello world2");
            2
        },
    };
    println!("{}", res);
}

fn main() {
    println!("start time = {}", Local::now());
    task::block_on(say_hello());
    println!("end time = {}", Local::now());
}
// start time = 2020-04-12 17:22:07.383247 +08:00
// hello world1!, 2020-04-12 17:22:09.388909 +08:00
// hello world1
// 1
// end time = 2020-04-12 17:22:09.388959 +08:00

消息传递

https://docs.rs/futures/0.3.4/futures/channel/index.html

mpsc

A multi-producer, single-consumer queue for sending values across asynchronous tasks.

生产者可以进行clone(),消费者只有一个,没用clone()方法

例如let (tx, rx) = mpsc::unbounded::<i32>();
tx就是生产者,rx是消费者,tx可以调用clone()复制出一个新的生产者,rx就没有clone()方法

1
2
async-std = "*"
futures = "*"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use std::{env, io::Error as IoError};
use async_std::task;
use async_std::net::{TcpListener, TcpStream};
use futures::{future, channel::mpsc, StreamExt};

type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;

async fn receive_code(code_receiver: Receiver<String>) {

    let task = code_receiver.for_each(|s| {
        println!("receive {}", s);
        future::ready(())
    });
    task.await;
}

async fn handle_connection(stream: TcpStream, code_sender: Sender<String>) {
    code_sender.unbounded_send(String::from("000001"));
}

async fn run() -> Result<(), IoError> {
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:8088".to_string());

    let listener = TcpListener::bind(&addr).await.expect("Failed to bind");

    let (code_sender, code_receiver) = mpsc::unbounded();

    task::spawn(receive_code(code_receiver));

    while let Ok((stream, _)) = listener.accept().await {
        task::spawn(handle_connection(stream, code_sender.clone()));
    }

    Ok(())
}

fn main() -> Result<(), IoError> {
    task::block_on(run())
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use std::{env, io::Error as IoError};
use async_std::task;
use async_std::net::{TcpListener, TcpStream};
use futures::{channel::mpsc, StreamExt};

type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;

async fn receive_code(mut code_receiver: Receiver<String>) {
    while let Some(code) = code_receiver.next().await {
        println!("code = {}", code);
    };
    println!("disconnect!"); // 没有打印
}

async fn handle_connection(stream: TcpStream, code_sender: Sender<String>) {
    code_sender.unbounded_send(String::from("000001"));
}

async fn run() -> Result<(), IoError> {
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:8088".to_string());

    let listener = TcpListener::bind(&addr).await.expect("Failed to bind");

    let (code_sender, code_receiver) = mpsc::unbounded();

    task::spawn(receive_code(code_receiver));

    while let Ok((stream, _)) = listener.accept().await {
        task::spawn(handle_connection(stream, code_sender.clone()));
    }

    Ok(())
}

fn main() -> Result<(), IoError> {
    task::block_on(run())
}

oneshot

A channel for sending a single message between asynchronous tasks.

Either

Enum futures::future::Either

1
2
3
4
pub enum Either<A, B> {
    Left(A),
    Right(B),
}

Combines two different futures, streams, or sinks having the same associated types into a single type.

Stream

Future是异步开发中最基础的概念了,如果说Future代表了一次性的异步值,那么Stream则代表了一系列的异步值。

1
2
3
4
5
pub trait Stream {
    type Item;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

Stream 对应了同步原语中的 Iterator 的概念

1
2
3
4
5
pub trait Iterator {
    type Item;

    fn next(&mut self) -> Option<Self::Item>;
}

在同步Rust中,核心流抽象是迭代器(Iterator)
它提供了一种按顺序 出让(yield)每一项(item),并阻塞了它们。
通过将迭代器传递到其他迭代器的构造器(constructors)中来完成组合,从而使我们能够在不费吹灰之力的情况下就将所有内容都组合在一起。

在异步Rust中,核心流抽象是流(Stream)
它的行为与 Iterator 非常相似,但是它不会阻塞每个 item 的 出让(yield),而是允许其他任务在等待时运行。

Stream 用来抽象源源不断的数据源,当然也可以断(当 poll 到 None 的时候)。
可以用来抽象 Websocket Connection 读取端,在Websokcet中,服务端源源不断的接受客户端的值并处理,直至客户端断开连接。
更进一步的抽象,MQ 中的 Consumer, Tcp 中接收方,都可以看作是一个 Stream, 因此 Stream 的抽象对异步编程意义非凡。

forward

1
2
3
4
fn forward<S>(self, sink: S) -> Forward<Self, S>
where
    S: Sink<Self::Ok>,
    Self: TryStream<Error = <S as Sink<Self::Ok>>::Error>, 

A future that completes after the given stream has been fully processed into the sink and the sink has been flushed and closed.

This future will drive the stream to keep producing items until it is exhausted, sending each item to the sink.
It will complete once the stream is exhausted, the sink has received and flushed all items, and the sink is closed.
Note that neither the original stream nor provided sink will be output by this future.
Pass the sink by Pin<&mut S> (for example, via forward(&mut sink) inside an async fn/block) in order to preserve access to the Sink.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use std::{env, io::Error as IoError, net::SocketAddr};
use futures::prelude::*;
use futures::channel::mpsc::unbounded;
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use tungstenite::protocol::Message;

async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) {
    println!("Incoming TCP connection from: {}", addr);

    let ws_stream = async_tungstenite::accept_async(raw_stream)
        .await
        .expect("Error during the websocket handshake occurred");
    println!("WebSocket connection established: {}", addr);

    // Insert the write part of this peer to the peer map.
    let (tx, rx) = unbounded();
    tx.unbounded_send(Message::Text("hhhh".to_string())).ok();
    let (outgoing, mut incoming) = ws_stream.split();

    // let (new_sender, new_receiver) = unbounded();
    let task_forward_write = rx.map(Ok).forward(outgoing);
    // let new_task_forward_write = rx.map(Ok).forward(new_sender);

    let task_read = async {
        while let Some(item) = incoming.next().await {
            let item = item.unwrap();
            match item {
                Message::Text(s) => {
                    println!("{}", s);
                    tx.unbounded_send(Message::Text(s.to_string())).ok();
                },
                Message::Ping(v) => {
                    tx.unbounded_send(Message::Pong(v)).ok();
                },
                Message::Pong(_) => (),
                Message::Binary(_) => (),
                Message::Close(_) => {break;},
            }
        }
    };

    futures::select! {
        _ = task_read.fuse() => (), 
        _ = task_forward_write.fuse() => (),
    };
    println!("{} disconnected", &addr);
}

async fn run() -> Result<(), IoError> {
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:8081".to_string());

    // Create the event loop and TCP listener we'll accept connections on.
    let try_socket = TcpListener::bind(&addr).await;
    let listener = try_socket.expect("Failed to bind");
    println!("Listening on: {}", addr);

    // Let's spawn the handling of each connection in a separate task.
    while let Ok((stream, addr)) = listener.accept().await {
        task::spawn(handle_connection(stream, addr));
    }

    Ok(())
}

fn main() -> Result<(), IoError> {
    task::block_on(run())
}

注意

  • 不要在任何异步函数中执行任何阻塞操作,不仅仅是thread::sleep, 还有标准库的Tcp/Udp, 以及sync中的channel, Mutex, RWLock 都不应该继续使用,除非你知道你在干什么!替换为async-std 与 futures 中实现的版本。
  • 如非必要,不要自己尝试去实现Future,自己实现的没有触发wake操作的话,将永远不会唤醒,取而代之,用已经实现好的Future进行组合。
  • 使用async/await代替所有需要异步等待的点,这将会极大的简化你的代码。

futures_timer

1
2
3
4
5
[dependencies]
async-std = "*"
futures = "*"
chrono = "*"
futures-timer = "*"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
use async_std::task;
use std::time::Duration;
use chrono::Local;
use futures_timer::Delay;

async fn hello_world1() {
    // task::sleep(Duration::from_secs(2)).await;
    Delay::new(Duration::from_secs(2)).await;
    println!("hello world1!, {}", Local::now());
}

async fn hello_world2() {
    Delay::new(Duration::from_secs(3)).await;
    // task::sleep(Duration::from_secs(3)).await;
    println!("hello world2!, {}", Local::now());
}

async fn say_hello() {
    futures::join!(hello_world1(), hello_world2());
}

fn main() {
    println!("start time = {}", Local::now());
    task::block_on(say_hello());
}
// start time = 2020-04-18 09:44:46.768148 +08:00
// hello world1!, 2020-04-18 09:44:48.774257 +08:00
// hello world2!, 2020-04-18 09:44:49.774808 +08:00