pub fn reply<S>(event_stream: S) -> impl Reply where
S: TryStream<Ok = Event> + Send + 'static,
S::Error: StdError + Send + Sync + 'static,
Expand description
Server-sent events reply
This function converts stream of server events into a Reply
with:
- Status of
200 OK
- Header
content-type: text/event-stream
- Header
cache-control: no-cache
.
Example
use std::time::Duration;
use futures::Stream;
use futures::stream::iter;
use std::convert::Infallible;
use warp::{Filter, sse::Event};
use serde_derive::Serialize;
#[derive(Serialize)]
struct Msg {
from: u32,
text: String,
}
fn event_stream() -> impl Stream<Item = Result<Event, Infallible>> {
iter(vec![
// Unnamed event with data only
Ok(Event::default().data("payload")),
// Named event with ID and retry timeout
Ok(
Event::default().data("other message\nwith next line")
.event("chat")
.id(1.to_string())
.retry(Duration::from_millis(15000))
),
// Event with JSON data
Ok(
Event::default().id(2.to_string())
.json_data(Msg {
from: 2,
text: "hello".into(),
}).unwrap(),
)
])
}
async {
let app = warp::path("sse").and(warp::get()).map(|| {
warp::sse::reply(event_stream())
});
let res = warp::test::request()
.method("GET")
.header("Connection", "Keep-Alive")
.path("/sse")
.reply(&app)
.await
.into_body();
assert_eq!(
res,
r#"data:payload
event:chat
data:other message
data:with next line
id:1
retry:15000
data:{"from":2,"text":"hello"}
id:2
"#
);
};
Examples found in repository?
examples/sse.rs (line 27)
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
async fn main() {
pretty_env_logger::init();
let routes = warp::path("ticks").and(warp::get()).map(|| {
let mut counter: u64 = 0;
// create server event source
let interval = interval(Duration::from_secs(1));
let stream = IntervalStream::new(interval);
let event_stream = stream.map(move |_| {
counter += 1;
sse_counter(counter)
});
// reply using server-sent events
warp::sse::reply(event_stream)
});
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
More examples
examples/sse_chat.rs (line 43)
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
async fn main() {
pretty_env_logger::init();
// Keep track of all connected users, key is usize, value
// is an event stream sender.
let users = Arc::new(Mutex::new(HashMap::new()));
// Turn our "state" into a new Filter...
let users = warp::any().map(move || users.clone());
// POST /chat -> send message
let chat_send = warp::path("chat")
.and(warp::post())
.and(warp::path::param::<usize>())
.and(warp::body::content_length_limit(500))
.and(
warp::body::bytes().and_then(|body: bytes::Bytes| async move {
std::str::from_utf8(&body)
.map(String::from)
.map_err(|_e| warp::reject::custom(NotUtf8))
}),
)
.and(users.clone())
.map(|my_id, msg, users| {
user_message(my_id, msg, &users);
warp::reply()
});
// GET /chat -> messages stream
let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| {
// reply using server-sent events
let stream = user_connected(users);
warp::sse::reply(warp::sse::keep_alive().stream(stream))
});
// GET / -> index html
let index = warp::path::end().map(|| {
warp::http::Response::builder()
.header("content-type", "text/html; charset=utf-8")
.body(INDEX_HTML)
});
let routes = index.or(chat_recv).or(chat_send);
warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}