pub struct Server<F> { /* private fields */ }
Expand description
A Warp Server ready to filter requests.
Implementations
sourceimpl<F> Server<F> where
F: Filter + Clone + Send + Sync + 'static,
<F::Future as TryFuture>::Ok: Reply,
<F::Future as TryFuture>::Error: IsReject,
impl<F> Server<F> where
F: Filter + Clone + Send + Sync + 'static,
<F::Future as TryFuture>::Ok: Reply,
<F::Future as TryFuture>::Error: IsReject,
sourcepub async fn run(self, addr: impl Into<SocketAddr>)
pub async fn run(self, addr: impl Into<SocketAddr>)
Run this Server
forever on the current thread.
Examples found in repository?
More examples
sourcepub async fn run_incoming<I>(self, incoming: I) where
I: TryStream + Send,
I::Ok: AsyncRead + AsyncWrite + Send + 'static + Unpin,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
pub async fn run_incoming<I>(self, incoming: I) where
I: TryStream + Send,
I::Ok: AsyncRead + AsyncWrite + Send + 'static + Unpin,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
Run this Server
forever on the current thread with a specific stream
of incoming connections.
This can be used for Unix Domain Sockets, or TLS, etc.
sourcepub fn bind(
self,
addr: impl Into<SocketAddr> + 'static
) -> impl Future<Output = ()> + 'static
pub fn bind(
self,
addr: impl Into<SocketAddr> + 'static
) -> impl Future<Output = ()> + 'static
Bind to a socket address, returning a Future
that can be
executed on any runtime.
Panics
Panics if we are unable to bind to the provided address.
sourcepub async fn try_bind(self, addr: impl Into<SocketAddr>)
pub async fn try_bind(self, addr: impl Into<SocketAddr>)
Bind to a socket address, returning a Future
that can be
executed on any runtime.
In case we are unable to bind to the specified address, resolves to an error and logs the reason.
sourcepub fn bind_ephemeral(
self,
addr: impl Into<SocketAddr>
) -> (SocketAddr, impl Future<Output = ()> + 'static)
pub fn bind_ephemeral(
self,
addr: impl Into<SocketAddr>
) -> (SocketAddr, impl Future<Output = ()> + 'static)
Bind to a possibly ephemeral socket address.
Returns the bound address and a Future
that can be executed on
any runtime.
Panics
Panics if we are unable to bind to the provided address.
Examples found in repository?
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176
pub async fn run(self, addr: impl Into<SocketAddr>) {
let (addr, fut) = self.bind_ephemeral(addr);
let span = tracing::info_span!("Server::run", ?addr);
tracing::info!(parent: &span, "listening on http://{}", addr);
fut.instrument(span).await;
}
/// Run this `Server` forever on the current thread with a specific stream
/// of incoming connections.
///
/// This can be used for Unix Domain Sockets, or TLS, etc.
pub async fn run_incoming<I>(self, incoming: I)
where
I: TryStream + Send,
I::Ok: AsyncRead + AsyncWrite + Send + 'static + Unpin,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
{
self.run_incoming2(incoming.map_ok(crate::transport::LiftIo).into_stream())
.instrument(tracing::info_span!("Server::run_incoming"))
.await;
}
async fn run_incoming2<I>(self, incoming: I)
where
I: TryStream + Send,
I::Ok: Transport + Send + 'static + Unpin,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
{
let fut = self.serve_incoming2(incoming);
tracing::info!("listening with custom incoming");
fut.await;
}
/// Bind to a socket address, returning a `Future` that can be
/// executed on any runtime.
///
/// # Panics
///
/// Panics if we are unable to bind to the provided address.
pub fn bind(self, addr: impl Into<SocketAddr> + 'static) -> impl Future<Output = ()> + 'static {
let (_, fut) = self.bind_ephemeral(addr);
fut
}
More examples
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564
pub async fn handshake<F>(self, f: F) -> Result<WsClient, WsError>
where
F: Filter + Clone + Send + Sync + 'static,
F::Extract: Reply + Send,
F::Error: IsReject + Send,
{
let (upgraded_tx, upgraded_rx) = oneshot::channel();
let (wr_tx, wr_rx) = mpsc::unbounded_channel();
let wr_rx = UnboundedReceiverStream::new(wr_rx);
let (rd_tx, rd_rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
use tokio_tungstenite::tungstenite::protocol;
let (addr, srv) = crate::serve(f).bind_ephemeral(([127, 0, 0, 1], 0));
let mut req = self
.req
.header("connection", "upgrade")
.header("upgrade", "websocket")
.header("sec-websocket-version", "13")
.header("sec-websocket-key", "dGhlIHNhbXBsZSBub25jZQ==")
.req;
let query_string = match req.uri().query() {
Some(q) => format!("?{}", q),
None => String::from(""),
};
let uri = format!("http://{}{}{}", addr, req.uri().path(), query_string)
.parse()
.expect("addr + path is valid URI");
*req.uri_mut() = uri;
// let mut rt = current_thread::Runtime::new().unwrap();
tokio::spawn(srv);
let upgrade = ::hyper::Client::builder()
.build(AddrConnect(addr))
.request(req)
.and_then(|res| hyper::upgrade::on(res));
let upgraded = match upgrade.await {
Ok(up) => {
let _ = upgraded_tx.send(Ok(()));
up
}
Err(err) => {
let _ = upgraded_tx.send(Err(err));
return;
}
};
let ws = crate::ws::WebSocket::from_raw_socket(
upgraded,
protocol::Role::Client,
Default::default(),
)
.await;
let (tx, rx) = ws.split();
let write = wr_rx.map(Ok).forward(tx).map(|_| ());
let read = rx
.take_while(|result| match result {
Err(_) => future::ready(false),
Ok(m) => future::ready(!m.is_close()),
})
.for_each(move |item| {
rd_tx.send(item).expect("ws receive error");
future::ready(())
});
future::join(write, read).await;
});
match upgraded_rx.await {
Ok(Ok(())) => Ok(WsClient {
tx: wr_tx,
rx: rd_rx,
}),
Ok(Err(err)) => Err(WsError::new(err)),
Err(_canceled) => panic!("websocket handshake thread panicked"),
}
}
sourcepub fn try_bind_ephemeral(
self,
addr: impl Into<SocketAddr>
) -> Result<(SocketAddr, impl Future<Output = ()> + 'static), Error>
pub fn try_bind_ephemeral(
self,
addr: impl Into<SocketAddr>
) -> Result<(SocketAddr, impl Future<Output = ()> + 'static), Error>
Tried to bind a possibly ephemeral socket address.
Returns a Result
which fails in case we are unable to bind with the
underlying error.
Returns the bound address and a Future
that can be executed on
any runtime.
sourcepub fn bind_with_graceful_shutdown(
self,
addr: impl Into<SocketAddr> + 'static,
signal: impl Future<Output = ()> + Send + 'static
) -> (SocketAddr, impl Future<Output = ()> + 'static)
pub fn bind_with_graceful_shutdown(
self,
addr: impl Into<SocketAddr> + 'static,
signal: impl Future<Output = ()> + Send + 'static
) -> (SocketAddr, impl Future<Output = ()> + 'static)
Create a server with graceful shutdown signal.
When the signal completes, the server will start the graceful shutdown process.
Returns the bound address and a Future
that can be executed on
any runtime.
Example
use warp::Filter;
use futures::future::TryFutureExt;
use tokio::sync::oneshot;
let routes = warp::any()
.map(|| "Hello, World!");
let (tx, rx) = oneshot::channel();
let (addr, server) = warp::serve(routes)
.bind_with_graceful_shutdown(([127, 0, 0, 1], 3030), async {
rx.await.ok();
});
// Spawn the server into a runtime
tokio::task::spawn(server);
// Later, start the shutdown...
let _ = tx.send(());
sourcepub fn try_bind_with_graceful_shutdown(
self,
addr: impl Into<SocketAddr> + 'static,
signal: impl Future<Output = ()> + Send + 'static
) -> Result<(SocketAddr, impl Future<Output = ()> + 'static), Error>
pub fn try_bind_with_graceful_shutdown(
self,
addr: impl Into<SocketAddr> + 'static,
signal: impl Future<Output = ()> + Send + 'static
) -> Result<(SocketAddr, impl Future<Output = ()> + 'static), Error>
Create a server with graceful shutdown signal.
When the signal completes, the server will start the graceful shutdown process.
sourcepub fn serve_incoming<I>(self, incoming: I) -> impl Future<Output = ()> where
I: TryStream + Send,
I::Ok: AsyncRead + AsyncWrite + Send + 'static + Unpin,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
pub fn serve_incoming<I>(self, incoming: I) -> impl Future<Output = ()> where
I: TryStream + Send,
I::Ok: AsyncRead + AsyncWrite + Send + 'static + Unpin,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
Setup this Server
with a specific stream of incoming connections.
This can be used for Unix Domain Sockets, or TLS, etc.
Returns a Future
that can be executed on any runtime.
sourcepub fn serve_incoming_with_graceful_shutdown<I>(
self,
incoming: I,
signal: impl Future<Output = ()> + Send + 'static
) -> impl Future<Output = ()> where
I: TryStream + Send,
I::Ok: AsyncRead + AsyncWrite + Send + 'static + Unpin,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
pub fn serve_incoming_with_graceful_shutdown<I>(
self,
incoming: I,
signal: impl Future<Output = ()> + Send + 'static
) -> impl Future<Output = ()> where
I: TryStream + Send,
I::Ok: AsyncRead + AsyncWrite + Send + 'static + Unpin,
I::Error: Into<Box<dyn StdError + Send + Sync>>,
Setup this Server
with a specific stream of incoming connections and a
signal to initiate graceful shutdown.
This can be used for Unix Domain Sockets, or TLS, etc.
When the signal completes, the server will start the graceful shutdown process.
Returns a Future
that can be executed on any runtime.
Trait Implementations
Auto Trait Implementations
impl<F> RefUnwindSafe for Server<F> where
F: RefUnwindSafe,
impl<F> Send for Server<F> where
F: Send,
impl<F> Sync for Server<F> where
F: Sync,
impl<F> Unpin for Server<F> where
F: Unpin,
impl<F> UnwindSafe for Server<F> where
F: UnwindSafe,
Blanket Implementations
sourceimpl<T> BorrowMut<T> for T where
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
const: unstable · sourcepub fn borrow_mut(&mut self) -> &mut T
pub fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Instruments this type with the provided Span
, returning an
Instrumented
wrapper. Read more
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
impl<T> Same<T> for T
impl<T> Same<T> for T
type Output = T
type Output = T
Should always be Self