pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy
Expand description

Returns a Filter that matches any request and extracts a Future of a concatenated body.

The contents of the body will be flattened into a single contiguous Bytes, which may require memory copies. If you don’t require a contiguous buffer, using aggregate can be give better performance.

Warning

This does not have a default size limit, it would be wise to use one to prevent a overly large request from using too much memory.

Example

use warp::{Buf, Filter};

let route = warp::body::content_length_limit(1024 * 32)
    .and(warp::body::bytes())
    .map(|bytes: bytes::Bytes| {
        println!("bytes = {:?}", bytes);
    });
Examples found in repository?
src/filters/body.rs (line 175)
173
174
175
176
177
178
179
180
181
182
pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
    is_content_type::<Json>()
        .and(bytes())
        .and_then(|buf| async move {
            Json::decode(buf).map_err(|err| {
                tracing::debug!("request json body error: {}", err);
                reject::known(BodyDeserializeError { cause: err })
            })
        })
}
More examples
Hide additional examples
src/filters/multipart.rs (line 89)
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
    fn filter(&self, _: Internal) -> Self::Future {
        let boundary = super::header::header2::<ContentType>().and_then(|ct| {
            let mime = Mime::from(ct);
            let mime = mime
                .get_param("boundary")
                .map(|v| v.to_string())
                .ok_or_else(|| reject::invalid_header("content-type"));
            future::ready(mime)
        });

        let filt = super::body::content_length_limit(self.max_length)
            .and(boundary)
            .and(super::body::bytes())
            .map(|boundary, body| FormData {
                inner: Multipart::with_body(Cursor::new(body), boundary),
            });

        let fut = filt.filter(Internal);

        Box::pin(fut)
    }
examples/sse_chat.rs (line 27)
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async fn main() {
    pretty_env_logger::init();

    // Keep track of all connected users, key is usize, value
    // is an event stream sender.
    let users = Arc::new(Mutex::new(HashMap::new()));
    // Turn our "state" into a new Filter...
    let users = warp::any().map(move || users.clone());

    // POST /chat -> send message
    let chat_send = warp::path("chat")
        .and(warp::post())
        .and(warp::path::param::<usize>())
        .and(warp::body::content_length_limit(500))
        .and(
            warp::body::bytes().and_then(|body: bytes::Bytes| async move {
                std::str::from_utf8(&body)
                    .map(String::from)
                    .map_err(|_e| warp::reject::custom(NotUtf8))
            }),
        )
        .and(users.clone())
        .map(|my_id, msg, users| {
            user_message(my_id, msg, &users);
            warp::reply()
        });

    // GET /chat -> messages stream
    let chat_recv = warp::path("chat").and(warp::get()).and(users).map(|users| {
        // reply using server-sent events
        let stream = user_connected(users);
        warp::sse::reply(warp::sse::keep_alive().stream(stream))
    });

    // GET / -> index html
    let index = warp::path::end().map(|| {
        warp::http::Response::builder()
            .header("content-type", "text/html; charset=utf-8")
            .body(INDEX_HTML)
    });

    let routes = index.or(chat_recv).or(chat_send);

    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}