Skip to content

其他

smol

https://github.com/stjepang/smol

Cargo.toml中添加依赖:

1
smol = { version = "0.1", features = ["tokio02"] }

就可以使 async-std 和 tokio 一起使用(目前不再使用 smol,直接 async-std 加 features 即可)

async-tungstenite

https://docs.rs/async-tungstenite/0.4.2/async_tungstenite/index.html

https://github.com/sdroege/async-tungstenite

tungstenite

https://github.com/snapview/tungstenite-rs

https://docs.rs/tungstenite/0.10.1/tungstenite/

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use std::env;
use futures::StreamExt;
use async_std::prelude::*;
use async_std::task;
use async_tungstenite::async_std::connect_async;

async fn run() {
    let connect_addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "ws://hq.sinajs.cn/wskt?list=s_sh000001".to_string());

    let (ws_stream, _) = connect_async(&connect_addr)
        .await
        .expect("Failed to connect");
    println!("WebSocket handshake has been successfully completed");

    let (_, read) = ws_stream.split();

    let ws_to_stdout = {
        read.for_each(|message| async {
            let data = message.unwrap().into_data();
            async_std::io::stdout().write_all(&data).await.unwrap();
        })
    };
    ws_to_stdout.await;
}

fn main() {
    task::block_on(run())
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
use std::env;
use futures::{future, pin_mut, StreamExt};
use async_std::prelude::*;
use async_std::task;
use async_tungstenite::async_std::connect_async;

async fn run() {
    let connect_addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "ws://hq.sinajs.cn/wskt?list=s_sh000001".to_string());

    let (stdin_tx, stdin_rx) = futures::channel::mpsc::unbounded();
    // task::spawn(read_stdin(stdin_tx));

    let (ws_stream, _) = connect_async(&connect_addr)
        .await
        .expect("Failed to connect");
    println!("WebSocket handshake has been successfully completed");

    let (write, read) = ws_stream.split();

    let stdin_to_ws = stdin_rx.map(Ok).forward(write);
    let ws_to_stdout = {
        read.for_each(|message| async {
            let data = message.unwrap().into_data();
            async_std::io::stdout().write_all(&data).await.unwrap();
        })
    };

    pin_mut!(stdin_to_ws, ws_to_stdout);
    future::select(stdin_to_ws, ws_to_stdout).await;
}

fn main() {
    task::block_on(run())
}

服务端

服务端向客户端发送一条消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
use std::{
    env,
    io::Error as IoError,
    net::SocketAddr,
};
use futures::prelude::*;
use futures::{
    channel::mpsc::{unbounded, UnboundedSender},
};
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use tungstenite::protocol::Message;

async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) {
    println!("Incoming TCP connection from: {}", addr);

    let ws_stream = async_tungstenite::accept_async(raw_stream)
        .await
        .expect("Error during the websocket handshake occurred");
    println!("WebSocket connection established: {}", addr);

    // Insert the write part of this peer to the peer map.
    let (tx, rx) = unbounded();
    tx.unbounded_send(Message::Text("hhhh".to_string())).ok();
    let (outgoing, incoming) = ws_stream.split();

    let receive_from_others = rx.map(Ok).forward(outgoing);

    receive_from_others.await;
    println!("{} disconnected", &addr);
}

async fn run() -> Result<(), IoError> {
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:8081".to_string());

    // Create the event loop and TCP listener we'll accept connections on.
    let try_socket = TcpListener::bind(&addr).await;
    let listener = try_socket.expect("Failed to bind");
    println!("Listening on: {}", addr);

    // Let's spawn the handling of each connection in a separate task.
    while let Ok((stream, addr)) = listener.accept().await {
        task::spawn(handle_connection(stream, addr));
    }

    Ok(())
}

fn main() -> Result<(), IoError> {
    task::block_on(run())
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use std::{env, io::Error as IoError, net::SocketAddr};
use futures::prelude::*;
use futures::channel::mpsc::unbounded;
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use tungstenite::protocol::Message;

async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) {
    println!("Incoming TCP connection from: {}", addr);

    let ws_stream = async_tungstenite::accept_async(raw_stream)
        .await
        .expect("Error during the websocket handshake occurred");
    println!("WebSocket connection established: {}", addr);

    // Insert the write part of this peer to the peer map.
    let (tx, rx) = unbounded();
    tx.unbounded_send(Message::Text("hhhh".to_string())).ok();
    let (outgoing, mut incoming) = ws_stream.split();

    let task_forward_write = rx.map(Ok).forward(outgoing);

    let task_read = async {
        while let Some(item) = incoming.next().await {
            let item = item.unwrap();
            match item {
                Message::Text(s) => {
                    println!("{}", s);
                    tx.unbounded_send(Message::Text(s.to_string())).ok();
                },
                Message::Ping(v) => {
                    tx.unbounded_send(Message::Pong(v)).ok();
                },
                Message::Pong(_) => (),
                Message::Binary(_) => (),
                Message::Close(_) => {break;},
            }
        }
    };

    futures::select! {
        _ = task_read.fuse() => (), 
        _ = task_forward_write.fuse() => (),
    };
    println!("{} disconnected", &addr);
}

async fn run() -> Result<(), IoError> {
    let addr = env::args()
        .nth(1)
        .unwrap_or_else(|| "127.0.0.1:8081".to_string());

    // Create the event loop and TCP listener we'll accept connections on.
    let try_socket = TcpListener::bind(&addr).await;
    let listener = try_socket.expect("Failed to bind");
    println!("Listening on: {}", addr);

    // Let's spawn the handling of each connection in a separate task.
    while let Ok((stream, addr)) = listener.accept().await {
        task::spawn(handle_connection(stream, addr));
    }

    Ok(())
}

fn main() -> Result<(), IoError> {
    task::block_on(run())
}

tokio_tungstenite

https://docs.rs/tokio-tungstenite/0.10.1/tokio_tungstenite/index.html

https://github.com/snapview/tokio-tungstenite