Skip to content

websocket

服务端

1
2
3
4
5
[dependencies]
actix-web = "2.0"
actix-rt = "1.0.0"
actix = "0.9.0"
actix-web-actors = "2.0.0"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
use actix::{Actor, StreamHandler};
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;

/// Define http actor
struct MyWs;

impl Actor for MyWs {
    type Context = ws::WebsocketContext<Self>;
}

/// Handler for ws::Message message
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for MyWs {
    fn handle(
        &mut self,
        msg: Result<ws::Message, ws::ProtocolError>,
        ctx: &mut Self::Context,
    ) {
        println!("WS: {:?}", msg);
        match msg {
            Ok(ws::Message::Ping(msg)) => ctx.pong(&msg),
            Ok(ws::Message::Text(text)) => ctx.text(text),
            Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
            _ => (),
        }
    }
}

async fn index(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
    let resp = ws::start(MyWs {}, &req, stream);
    resp
}

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    HttpServer::new(|| App::new().route("/", web::get().to(index)))
        .bind("127.0.0.1:8088")?
        .run()
        .await
}

通过ws://localhost:8088/连接

客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
[dependencies]
chrono = { version = "0.4", features = ["serde"] }
actix-web = "2.0"
actix-rt = "1.0.0"
actix = "0.9.0"
actix-web-actors = "2.0.0"
bytes = "0.5.3"
futures = "0.3.1"
awc = "1.0.1"
actix-codec = "0.2.0"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
//! Simple websocket client.
use std::time::Duration;

use actix::io::SinkWrite;
use actix::*;
use actix_codec::Framed;
use awc::{
    error::WsProtocolError,
    ws::{Codec, Frame, Message},
    BoxedSocket, Client,
};
use bytes::Bytes;
use futures::stream::{SplitSink, StreamExt};
use chrono::prelude::*;

struct WebsocketClient {
    sink_write: SinkWrite<Message, SplitSink<Framed<BoxedSocket, Codec>, Message>>,
}

impl Actor for WebsocketClient {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Context<Self>) {
        // start heartbeats otherwise server will disconnect after 10 seconds
        self.hb(ctx)
    }

    fn stopped(&mut self, _: &mut Context<Self>) {
        println!("Disconnected");
        let local = Local::now();
        println!("{}", local);
        // Stop application on disconnect
        System::current().stop();
    }
}

impl WebsocketClient {
    fn hb(&self, ctx: &mut Context<Self>) {
        ctx.run_later(Duration::new(1, 0), |act, ctx| {
            act.sink_write.write(Message::Ping(Bytes::from_static(b""))).unwrap();
            act.hb(ctx);

            // client should also check for a timeout here, similar to the
            // server code
        });
    }
}

/// Handle server websocket messages
impl StreamHandler<Result<Frame, WsProtocolError>> for WebsocketClient {
    fn handle(&mut self, msg: Result<Frame, WsProtocolError>, _: &mut Context<Self>) {
        if let Ok(Frame::Text(txt)) = msg {
            println!("Server: {:?}", txt)
        }
    }

    fn started(&mut self, _ctx: &mut Context<Self>) {
        println!("Connected");
    }

    fn finished(&mut self, ctx: &mut Context<Self>) {
        println!("StreamHandler disconnected");
        let local = Local::now();
        println!("{}", local);
        ctx.stop()
    }
}

impl actix::io::WriteHandler<WsProtocolError> for WebsocketClient {}

fn main() {
    let sys = System::new("websocket-client");
    let local: DateTime<Local> = Local::now();
    println!("{}", local);
    Arbiter::spawn(async {
        let (_, framed) = Client::new()
            // .ws("http://127.0.0.1:8080/ws/")
            .ws("ws://hq.sinajs.cn/wskt?list=s_sh000001")
            .connect()
            .await
            .map_err(|e| {
                println!("Error: {}", e);
            })
            .unwrap();

        let (sink, stream) = framed.split();
        WebsocketClient::create(|ctx| {
            WebsocketClient::add_stream(stream, ctx);
            WebsocketClient{ sink_write: SinkWrite::new(sink, ctx) }
        });
    });
    sys.run().unwrap();
}

服务端主动推消息给所有已经连接的客户端

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[package]
name = "rs_test"
version = "0.1.0"
authors = ["zhaoyz <954241552@qq.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html


[[bin]]
name = "server"
path = "src/main.rs"

[dependencies]
rand = "0.6"
futures = "0.1.24"
actix = "0.8.2"
actix-web = "1.0"
actix-files = "0.1"
actix-web-actors = "1.0"
actix-broker = "0.2.0"
log = "0.4.5"
simple_logger = "0.5.0"

server.rs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use actix::prelude::*;
use actix_broker::BrokerSubscribe;
use rand;

use std::collections::HashMap;
use std::mem;
use actix_web_actors::ws;
use std::fmt;


#[derive(Clone, Message)]
pub struct ChatMessage(pub String);

// https://docs.rs/actix/0.9.0/actix/struct.Recipient.html?search=
type Client = Recipient<ChatMessage>;

#[derive(Default)]
pub struct WsChatServer {
    Clients: HashMap<Client, ()>,
}

impl WsChatServer {
    fn add_client_to_room(&mut self, client: Client) {
        self.Clients.insert(client, ());
    }

    fn send_msg(&mut self, msg: &str) {
        let mut no_connected_clients:Vec<Client> = Vec::new();
        for (client, _) in self.Clients.iter_mut() {
            println!("{}", client.connected());
            if !client.connected() {
                no_connected_clients.push(client.to_owned());
            } else {
                client.do_send(ChatMessage(msg.to_owned()));
            }

        }
        for client in no_connected_clients.iter() {
            self.Clients.remove(client);
        }
    }
}

impl Actor for WsChatServer {
    type Context = Context<Self>;

    // https://docs.rs/actix-broker/0.2.1/actix_broker/trait.BrokerSubscribe.html
    fn started(&mut self, ctx: &mut Self::Context) {
        // self.subscribe_system_async::<LeaveRoom>(ctx);
        // self.subscribe_system_async::<SendMessage>(ctx);
    }
}

#[derive(Clone, Message)]
pub struct JoinRoom(pub Recipient<ChatMessage>);

impl Handler<JoinRoom> for WsChatServer {
    type Result = ();

    fn handle(&mut self, msg: JoinRoom, _ctx: &mut Self::Context) {
        println!("server handle hhhh");
        let JoinRoom(client) = msg;
        self.Clients.insert(client, ());
        println!("{}", self.Clients.len());
        let build_string = (self.Clients.len()).to_string();
        self.send_msg(&build_string[..]);
    }
}



impl SystemService for WsChatServer {}
impl Supervised for WsChatServer {}

mina.rs:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#[macro_use]
extern crate log;

use actix::fut;
use actix::prelude::*;
use actix_broker::BrokerIssue;
use actix_files::Files;
use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer};
use actix_web_actors::ws;

mod server;
use server::*;

fn chat_route(req: HttpRequest, stream: web::Payload) -> Result<HttpResponse, Error> {
    ws::start(WsChatSession::default(), &req, stream)
}

#[derive(Default)]
struct WsChatSession {
    id: usize,
    room: String,
    name: Option<String>,
}

impl WsChatSession {
    fn join_room(&mut self, room_name: &str, ctx: &mut ws::WebsocketContext<Self>) {
        // First send a leave message for the current room
        // issue_sync comes from having the `BrokerIssue` trait in scope.
        // Then send a join message for the new room

        let join_msg = JoinRoom(
            ctx.address().recipient(),
        );

        println!("send msg");
        // https://docs.rs/actix/0.5.6/actix/registry/trait.SystemService.html
        WsChatServer::from_registry()
            .send(join_msg)
            .into_actor(self)
            .then(|id, act, _ctx| {
                fut::ok(())
            })
            .spawn(ctx);
        // https://docs.rs/actix/0.5.6/actix/trait.AsyncContext.html#tymethod.spawn
    }
}

impl Actor for WsChatSession {
    type Context = ws::WebsocketContext<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        println!("join room hhh");
        self.join_room("Main", ctx);
    }

    fn stopped(&mut self, ctx: &mut Self::Context) {
        info!(
            "WsChatSession closed for {}({}) in room {}",
            self.name.clone().unwrap_or_else(|| "anon".to_string()),
            self.id,
            self.room
        );
        println!("has leave room");
    }
}

impl Handler<ChatMessage> for WsChatSession {
    type Result = ();

    fn handle(&mut self, msg: ChatMessage, ctx: &mut Self::Context) {
        ctx.text(msg.0);
    }
}

impl StreamHandler<ws::Message, ws::ProtocolError> for WsChatSession {
    fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {

    }
}

fn main() -> std::io::Result<()> {
    let sys = actix::System::new("websocket-broker-example");
    simple_logger::init_with_level(log::Level::Info).unwrap();

    HttpServer::new(move || {
        App::new()
            .service(web::resource("/").to(chat_route))
    })
    .bind("127.0.0.1:8088")
    .unwrap()
    .start();

    info!("Started http server: 127.0.0.1:8088");
    sys.run()
}