pub fn keep_alive() -> KeepAlive
Expand description

Keeps event source connection alive when no events sent over a some time.

Some proxy servers may drop HTTP connection after a some timeout of inactivity. This function helps to prevent such behavior by sending comment events every keep_interval of inactivity.

By default the comment is : (an empty comment) and the time interval between events is 15 seconds. Both may be customized using the builder pattern as shown below.

use std::time::Duration;
use std::convert::Infallible;
use futures::StreamExt;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use warp::{Filter, Stream, sse::Event};

// create server-sent event
fn sse_counter(counter: u64) ->  Result<Event, Infallible> {
    Ok(Event::default().data(counter.to_string()))
}

fn main() {
    let routes = warp::path("ticks")
        .and(warp::get())
        .map(|| {
            let mut counter: u64 = 0;
            let interval = interval(Duration::from_secs(15));
            let stream = IntervalStream::new(interval);
            let event_stream = stream.map(move |_| {
                counter += 1;
                sse_counter(counter)
            });
            // reply using server-sent events
            let stream = warp::sse::keep_alive()
                .interval(Duration::from_secs(5))
                .text("thump".to_string())
                .stream(event_stream);
            warp::sse::reply(stream)
        });
}

See notes.

Examples found in repository?
examples/sse_chat.rs (line 43)
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async fn main() {
    pretty_env_logger::init();

    // Keep track of all connected users, key is usize, value
    // is an event stream sender.
    let users = Arc::new(Mutex::new(HashMap::new()));
    // Turn our "state" into a new Filter...
    let users = warp::any().map(move || users.clone());

    // POST /chat -> send message
    let chat_send = warp::path("chat")
        .and(warp::post())
        .and(warp::path::param::<usize>())
        .and(warp::body::content_length_limit(500))
        .and(
            warp::body::bytes().and_then(|body: bytes::Bytes| async move {
                std::str::from_utf8(&body)
                    .map(String::from)
                    .map_err(|_e| warp::reject::custom(NotUtf8))
            }),
        )
        .and(users.clone())
        .map(|my_id, msg, users| {
            user_message(my_id, msg, &users);
            warp::reply()
        });

    // GET /chat -> messages stream
    let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| {
        // reply using server-sent events
        let stream = user_connected(users);
        warp::sse::reply(warp::sse::keep_alive().stream(stream))
    });

    // GET / -> index html
    let index = warp::path::end().map(|| {
        warp::http::Response::builder()
            .header("content-type", "text/html; charset=utf-8")
            .body(INDEX_HTML)
    });

    let routes = index.or(chat_recv).or(chat_send);

    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}