pub fn keep_alive() -> KeepAlive
Expand description
Keeps event source connection alive when no events sent over a some time.
Some proxy servers may drop HTTP connection after a some timeout of inactivity.
This function helps to prevent such behavior by sending comment events every
keep_interval
of inactivity.
By default the comment is :
(an empty comment) and the time interval between
events is 15 seconds. Both may be customized using the builder pattern
as shown below.
use std::time::Duration;
use std::convert::Infallible;
use futures::StreamExt;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use warp::{Filter, Stream, sse::Event};
// create server-sent event
fn sse_counter(counter: u64) -> Result<Event, Infallible> {
Ok(Event::default().data(counter.to_string()))
}
fn main() {
let routes = warp::path("ticks")
.and(warp::get())
.map(|| {
let mut counter: u64 = 0;
let interval = interval(Duration::from_secs(15));
let stream = IntervalStream::new(interval);
let event_stream = stream.map(move |_| {
counter += 1;
sse_counter(counter)
});
// reply using server-sent events
let stream = warp::sse::keep_alive()
.interval(Duration::from_secs(5))
.text("thump".to_string())
.stream(event_stream);
warp::sse::reply(stream)
});
}
See notes.
Examples found in repository?
examples/sse_chat.rs (line 43)
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
async fn main() {
pretty_env_logger::init();
// Keep track of all connected users, key is usize, value
// is an event stream sender.
let users = Arc::new(Mutex::new(HashMap::new()));
// Turn our "state" into a new Filter...
let users = warp::any().map(move || users.clone());
// POST /chat -> send message
let chat_send = warp::path("chat")
.and(warp::post())
.and(warp::path::param::<usize>())
.and(warp::body::content_length_limit(500))
.and(
warp::body::bytes().and_then(|body: bytes::Bytes| async move {
std::str::from_utf8(&body)
.map(String::from)
.map_err(|_e| warp::reject::custom(NotUtf8))
}),
)
.and(users.clone())
.map(|my_id, msg, users| {
user_message(my_id, msg, &users);
warp::reply()
});
// GET /chat -> messages stream
let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| {
// reply using server-sent events
let stream = user_connected(users);
warp::sse::reply(warp::sse::keep_alive().stream(stream))
});
// GET / -> index html
let index = warp::path::end().map(|| {
warp::http::Response::builder()
.header("content-type", "text/html; charset=utf-8")
.body(INDEX_HTML)
});
let routes = index.or(chat_recv).or(chat_send);
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}