Function warp::filters::sse::reply

source · []
pub fn reply<S>(event_stream: S) -> impl Reply where
    S: TryStream<Ok = Event> + Send + 'static,
    S::Error: StdError + Send + Sync + 'static, 
Expand description

Server-sent events reply

This function converts stream of server events into a Reply with:

  • Status of 200 OK
  • Header content-type: text/event-stream
  • Header cache-control: no-cache.

Example


use std::time::Duration;
use futures::Stream;
use futures::stream::iter;
use std::convert::Infallible;
use warp::{Filter, sse::Event};
use serde_derive::Serialize;

#[derive(Serialize)]
struct Msg {
    from: u32,
    text: String,
}

fn event_stream() -> impl Stream<Item = Result<Event, Infallible>> {
        iter(vec![
            // Unnamed event with data only
            Ok(Event::default().data("payload")),
            // Named event with ID and retry timeout
            Ok(
                Event::default().data("other message\nwith next line")
                .event("chat")
                .id(1.to_string())
                .retry(Duration::from_millis(15000))
            ),
            // Event with JSON data
            Ok(
                Event::default().id(2.to_string())
                .json_data(Msg {
                    from: 2,
                    text: "hello".into(),
                }).unwrap(),
            )
        ])
}

async {
    let app = warp::path("sse").and(warp::get()).map(|| {
       warp::sse::reply(event_stream())
    });

    let res = warp::test::request()
        .method("GET")
        .header("Connection", "Keep-Alive")
        .path("/sse")
        .reply(&app)
        .await
        .into_body();

    assert_eq!(
        res,
        r#"data:payload

event:chat
data:other message
data:with next line
id:1
retry:15000

data:{"from":2,"text":"hello"}
id:2

"#
    );
};
Examples found in repository?
examples/sse.rs (line 27)
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
async fn main() {
    pretty_env_logger::init();

    let routes = warp::path("ticks").and(warp::get()).map(|| {
        let mut counter: u64 = 0;
        // create server event source
        let interval = interval(Duration::from_secs(1));
        let stream = IntervalStream::new(interval);
        let event_stream = stream.map(move |_| {
            counter += 1;
            sse_counter(counter)
        });
        // reply using server-sent events
        warp::sse::reply(event_stream)
    });

    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
More examples
Hide additional examples
examples/sse_chat.rs (line 43)
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async fn main() {
    pretty_env_logger::init();

    // Keep track of all connected users, key is usize, value
    // is an event stream sender.
    let users = Arc::new(Mutex::new(HashMap::new()));
    // Turn our "state" into a new Filter...
    let users = warp::any().map(move || users.clone());

    // POST /chat -> send message
    let chat_send = warp::path("chat")
        .and(warp::post())
        .and(warp::path::param::<usize>())
        .and(warp::body::content_length_limit(500))
        .and(
            warp::body::bytes().and_then(|body: bytes::Bytes| async move {
                std::str::from_utf8(&body)
                    .map(String::from)
                    .map_err(|_e| warp::reject::custom(NotUtf8))
            }),
        )
        .and(users.clone())
        .map(|my_id, msg, users| {
            user_message(my_id, msg, &users);
            warp::reply()
        });

    // GET /chat -> messages stream
    let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| {
        // reply using server-sent events
        let stream = user_connected(users);
        warp::sse::reply(warp::sse::keep_alive().stream(stream))
    });

    // GET / -> index html
    let index = warp::path::end().map(|| {
        warp::http::Response::builder()
            .header("content-type", "text/html; charset=utf-8")
            .body(INDEX_HTML)
    });

    let routes = index.or(chat_recv).or(chat_send);

    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}