pavex/server/server.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
use std::future::Future;
use std::net::SocketAddr;
use crate::connection::ConnectionInfo;
use crate::server::configuration::ServerConfiguration;
use crate::server::server_handle::ServerHandle;
use super::IncomingStream;
/// An HTTP server to handle incoming connections for Pavex applications.
/// It handles both HTTP1 and HTTP2 connections.
///
/// # Example
///
/// ```rust
/// use std::net::SocketAddr;
/// use pavex::server::Server;
///
/// # #[derive(Clone)] struct ApplicationState;
/// # async fn router(_req: hyper::Request<hyper::body::Incoming>, _conn_info: Option<pavex::connection::ConnectionInfo>, _state: ApplicationState) -> pavex::response::Response { todo!() }
/// # async fn t() -> std::io::Result<()> {
/// # let application_state = ApplicationState;
/// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
///
/// Server::new()
/// .bind(addr)
/// .await?
/// // Both the routing function and the application state will usually
/// // be code-generated by Pavex, starting from your `Blueprint`.
/// // You don't have to define them manually!
/// .serve(router, application_state)
/// // The `serve` method returns a `ServerHandle` that you can use to
/// // interact with the server.
/// // Calling `.await` on the handle lets you wait until the server
/// // shuts down.
/// .await;
/// # Ok(())
/// # }
/// ```
///
/// # Configuration
///
/// [`Server::new`] returns a new [`Server`] with default configuration.
/// You can customize the server default settings by creating your own [`ServerConfiguration`]
/// and invoking [`Server::set_config`].
///
/// # Architecture
///
/// By default, [`Server::serve`] creates a worker per CPU core and distributes connection from an
/// acceptor thread using a round-robin strategy.
///
/// Each worker has its own single-threaded [`tokio`] runtime—there is no work stealing across
/// workers.
/// Each worker takes care to invoke your routing and request handling logic, with the help
/// of [`hyper`].
#[must_use = "You must call `serve` on a `Server` to start listening for incoming connections"]
pub struct Server {
config: ServerConfiguration,
incoming: Vec<IncomingStream>,
}
impl Default for Server {
fn default() -> Self {
Self::new()
}
}
impl Server {
/// Create a new [`Server`] with default configuration.
pub fn new() -> Self {
Self {
config: ServerConfiguration::default(),
incoming: Vec::new(),
}
}
/// Configure this [`Server`] according to the values set in the [`ServerConfiguration`]
/// passed as input parameter.
/// It will overwrite any previous configuration set on this [`Server`].
///
/// If you want to retrieve the current configuration, use [`Server::get_config`].
pub fn set_config(mut self, config: ServerConfiguration) -> Self {
self.config = config;
self
}
/// Get a reference to the [`ServerConfiguration`] for this [`Server`].
///
/// If you want to overwrite the existing configuration, use [`Server::set_config`].
pub fn get_config(&self) -> &ServerConfiguration {
&self.config
}
/// Bind the server to the given address: the server will accept incoming connections from this
/// address when started.
/// Binding an address may fail (e.g. if the address is already in use), therefore this method
/// may return an error.
///
/// # Related
///
/// Check out [`Server::listen`] for an alternative binding mechanism as well as a
/// discussion of the pros and cons of [`Server::bind`] vs [`Server::listen`].
///
/// # Note
///
/// A [`Server`] can be bound to multiple addresses: just call this method multiple times with
/// all the addresses you want to bind to.
///
/// # Example: bind one address
///
/// ```rust
/// use std::net::SocketAddr;
/// use pavex::server::Server;
///
/// # async fn t() -> std::io::Result<()> {
/// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
///
/// Server::new()
/// .bind(addr)
/// .await?
/// # ;
/// // [...]
/// # Ok(())
/// # }
/// ```
///
/// # Example: bind multiple addresses
///
/// ```rust
/// use std::net::SocketAddr;
/// use pavex::server::Server;
///
/// # async fn t() -> std::io::Result<()> {
/// let addr1 = SocketAddr::from(([127, 0, 0, 1], 8080));
/// let addr2 = SocketAddr::from(([127, 0, 0, 1], 4000));
///
/// Server::new()
/// .bind(addr1)
/// .await?
/// .bind(addr2)
/// .await?
/// # ;
/// // [...]
/// # Ok(())
/// # }
/// ````
pub async fn bind(mut self, addr: SocketAddr) -> std::io::Result<Self> {
let incoming = IncomingStream::bind(addr).await?;
self.incoming.push(incoming);
Ok(self)
}
/// Ask the server to process incoming connections from the provided [`IncomingStream`].
///
/// # [`Server::listen`] vs [`Server::bind`]
///
/// [`Server::bind`] only requires you to specify the address you want to listen at. The
/// socket configuration is handled by the [`Server`], with a set of reasonable default
/// parameters. You have no access to the [`IncomingStream`] that gets bound to the address
/// you specified.
///
/// [`Server::listen`], instead, expects an [`IncomingStream`].
/// You are free to configure the socket as you see please and the [`Server`] will just
/// poll it for incoming connections.
/// It also allows you to interact with the bound [`IncomingStream`] directly
///
/// # Example: bind to a random port
///
/// ```rust
/// use std::net::SocketAddr;
/// use pavex::server::{IncomingStream, Server};
///
/// # async fn t() -> std::io::Result<()> {
/// // `0` is a special port: it tells the OS to assign us
/// // a random **unused** port
/// let addr = SocketAddr::from(([127, 0, 0, 1], 0));
/// let incoming = IncomingStream::bind(addr).await?;
/// // We can then retrieve the actual port we were assigned
/// // by the OS.
/// let addr = incoming.local_addr()?.to_owned();
///
/// Server::new()
/// .listen(incoming);
/// # ;
/// // [...]
/// # Ok(())
/// # }
/// ````
///
/// # Example: set a custom socket backlog
///
/// ```rust
/// use std::net::SocketAddr;
/// use socket2::Domain;
/// use pavex::server::{IncomingStream, Server};
///
/// # async fn t() -> std::io::Result<()> {
/// // `0` is a special port: it tells the OS to assign us
/// // a random **unused** port
/// let addr = SocketAddr::from(([127, 0, 0, 1], 0));
///
/// let socket = socket2::Socket::new(
/// Domain::for_address(addr),
/// socket2::Type::STREAM,
/// Some(socket2::Protocol::TCP),
/// )
/// .expect("Failed to create a socket");
/// socket.set_reuse_address(true)?;
/// socket.set_nonblocking(true)?;
/// socket.bind(&addr.into())?;
/// // The custom backlog!
/// socket.listen(2048_i32)?;
///
/// let listener = std::net::TcpListener::from(socket);
/// Server::new()
/// .listen(listener.try_into()?)
/// # ;
/// // [...]
/// # Ok(())
/// # }
/// ````
///
/// # Note
///
/// A [`Server`] can listen to multiple streams of incoming connections: just call this method
/// multiple times!
pub fn listen(mut self, incoming: IncomingStream) -> Self {
self.incoming.push(incoming);
self
}
/// Start listening for incoming connections.
///
/// You must specify:
///
/// - a handler function, which will be called for each incoming request;
/// - the application state, the set of singleton components that will be available to
/// your handler function.
///
/// Both the handler function and the application state are usually code-generated by Pavex
/// starting from your [`Blueprint`](crate::blueprint::Blueprint).
///
/// # Wait for the server to shut down
///
/// `serve` returns a [`ServerHandle`].
/// Calling `.await` on the handle lets you wait until the server shuts down.
///
/// # Panics
///
/// This method will panic if the [`Server`] has no registered source of incoming connections,
/// i.e. if you did not call [`Server::bind`] or [`Server::listen`] before calling `serve`.
pub fn serve<HandlerFuture, ApplicationState>(
self,
handler: fn(
http::Request<hyper::body::Incoming>,
Option<ConnectionInfo>,
ApplicationState,
) -> HandlerFuture,
application_state: ApplicationState,
) -> ServerHandle
where
HandlerFuture: Future<Output = crate::response::Response> + 'static,
ApplicationState: Clone + Send + Sync + 'static,
{
if self.incoming.is_empty() {
panic!("Cannot serve: there is no source of incoming connections. Please call `bind` or `listen` on the server before calling `serve`.");
}
ServerHandle::new(self.config, self.incoming, handler, application_state)
}
}