其他
smol
https://github.com/stjepang/smol
在Cargo.toml
中添加依赖:
| smol = { version = "0.1", features = ["tokio02"] }
|
就可以使 async-std 和 tokio 一起使用(目前不再使用 smol,直接 async-std 加 features 即可)
async-tungstenite
https://docs.rs/async-tungstenite/0.4.2/async_tungstenite/index.html
https://github.com/sdroege/async-tungstenite
tungstenite
https://github.com/snapview/tungstenite-rs
https://docs.rs/tungstenite/0.10.1/tungstenite/
客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 | use std::env;
use futures::StreamExt;
use async_std::prelude::*;
use async_std::task;
use async_tungstenite::async_std::connect_async;
async fn run() {
let connect_addr = env::args()
.nth(1)
.unwrap_or_else(|| "ws://hq.sinajs.cn/wskt?list=s_sh000001".to_string());
let (ws_stream, _) = connect_async(&connect_addr)
.await
.expect("Failed to connect");
println!("WebSocket handshake has been successfully completed");
let (_, read) = ws_stream.split();
let ws_to_stdout = {
read.for_each(|message| async {
let data = message.unwrap().into_data();
async_std::io::stdout().write_all(&data).await.unwrap();
})
};
ws_to_stdout.await;
}
fn main() {
task::block_on(run())
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36 | use std::env;
use futures::{future, pin_mut, StreamExt};
use async_std::prelude::*;
use async_std::task;
use async_tungstenite::async_std::connect_async;
async fn run() {
let connect_addr = env::args()
.nth(1)
.unwrap_or_else(|| "ws://hq.sinajs.cn/wskt?list=s_sh000001".to_string());
let (stdin_tx, stdin_rx) = futures::channel::mpsc::unbounded();
// task::spawn(read_stdin(stdin_tx));
let (ws_stream, _) = connect_async(&connect_addr)
.await
.expect("Failed to connect");
println!("WebSocket handshake has been successfully completed");
let (write, read) = ws_stream.split();
let stdin_to_ws = stdin_rx.map(Ok).forward(write);
let ws_to_stdout = {
read.for_each(|message| async {
let data = message.unwrap().into_data();
async_std::io::stdout().write_all(&data).await.unwrap();
})
};
pin_mut!(stdin_to_ws, ws_to_stdout);
future::select(stdin_to_ws, ws_to_stdout).await;
}
fn main() {
task::block_on(run())
}
|
服务端
服务端向客户端发送一条消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 | use std::{
env,
io::Error as IoError,
net::SocketAddr,
};
use futures::prelude::*;
use futures::{
channel::mpsc::{unbounded, UnboundedSender},
};
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use tungstenite::protocol::Message;
async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) {
println!("Incoming TCP connection from: {}", addr);
let ws_stream = async_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {}", addr);
// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded();
tx.unbounded_send(Message::Text("hhhh".to_string())).ok();
let (outgoing, incoming) = ws_stream.split();
let receive_from_others = rx.map(Ok).forward(outgoing);
receive_from_others.await;
println!("{} disconnected", &addr);
}
async fn run() -> Result<(), IoError> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8081".to_string());
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
println!("Listening on: {}", addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
task::spawn(handle_connection(stream, addr));
}
Ok(())
}
fn main() -> Result<(), IoError> {
task::block_on(run())
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 | use std::{env, io::Error as IoError, net::SocketAddr};
use futures::prelude::*;
use futures::channel::mpsc::unbounded;
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use tungstenite::protocol::Message;
async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr) {
println!("Incoming TCP connection from: {}", addr);
let ws_stream = async_tungstenite::accept_async(raw_stream)
.await
.expect("Error during the websocket handshake occurred");
println!("WebSocket connection established: {}", addr);
// Insert the write part of this peer to the peer map.
let (tx, rx) = unbounded();
tx.unbounded_send(Message::Text("hhhh".to_string())).ok();
let (outgoing, mut incoming) = ws_stream.split();
let task_forward_write = rx.map(Ok).forward(outgoing);
let task_read = async {
while let Some(item) = incoming.next().await {
let item = item.unwrap();
match item {
Message::Text(s) => {
println!("{}", s);
tx.unbounded_send(Message::Text(s.to_string())).ok();
},
Message::Ping(v) => {
tx.unbounded_send(Message::Pong(v)).ok();
},
Message::Pong(_) => (),
Message::Binary(_) => (),
Message::Close(_) => {break;},
}
}
};
futures::select! {
_ = task_read.fuse() => (),
_ = task_forward_write.fuse() => (),
};
println!("{} disconnected", &addr);
}
async fn run() -> Result<(), IoError> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8081".to_string());
// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
let listener = try_socket.expect("Failed to bind");
println!("Listening on: {}", addr);
// Let's spawn the handling of each connection in a separate task.
while let Ok((stream, addr)) = listener.accept().await {
task::spawn(handle_connection(stream, addr));
}
Ok(())
}
fn main() -> Result<(), IoError> {
task::block_on(run())
}
|
tokio_tungstenite
https://docs.rs/tokio-tungstenite/0.10.1/tokio_tungstenite/index.html
https://github.com/snapview/tokio-tungstenite