1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
use std::net::SocketAddr;
use socket2::Domain;
use tokio::net::{TcpListener, TcpStream};
/// A stream of incoming connections.
///
/// [`IncomingStream::bind`] is the primary entrypoint for constructing a new [`IncomingStream`].
///
/// Incoming connections will be usually passed to a [`Server`](super::Server) instance to be handled.
/// Check out [`Server::bind`](super::Server::bind) or
/// [`Server::listen`](super::Server::listen) for more information.
pub struct IncomingStream {
listener: TcpListener,
}
impl IncomingStream {
/// Create a new [`IncomingStream`] by binding to a socket address.
/// The socket will be configured to be non-blocking and reuse the address.
///
/// # Example
///
/// ```rust
/// use std::net::SocketAddr;
/// use pavex::server::{IncomingStream, Server};
///
/// # async fn t() -> std::io::Result<()> {
/// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
/// let incoming = IncomingStream::bind(addr).await?;
/// # Ok(())
/// # }
/// ````
///
/// # Custom configuration
///
/// If you want to customize the options set on the socket, you can build your own
/// [`TcpListener`](std::net::TcpListener) using [`socket2::Socket`] and then convert it
/// into an [`IncomingStream`] via [`TryFrom::try_from`].
/// There's only one option you can't change: the socket will always be set to non-blocking
/// mode.
///
/// ```rust
/// use std::net::SocketAddr;
/// use socket2::Domain;
/// use pavex::server::{IncomingStream, Server};
///
/// # async fn t() -> std::io::Result<()> {
/// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
///
/// let socket = socket2::Socket::new(
/// Domain::for_address(addr),
/// socket2::Type::STREAM,
/// Some(socket2::Protocol::TCP),
/// )
/// .expect("Failed to create a socket");
/// socket.set_reuse_address(true)?;
/// socket.set_nonblocking(true)?;
/// socket.bind(&addr.into())?;
/// // We customize the backlog setting!
/// socket.listen(2048_i32)?;
///
/// let listener = std::net::TcpListener::from(socket);
/// let incoming: IncomingStream = listener.try_into()?;
/// # Ok(())
/// # }
/// ````
pub async fn bind(addr: SocketAddr) -> std::io::Result<Self> {
let socket = socket2::Socket::new(
Domain::for_address(addr),
socket2::Type::STREAM,
Some(socket2::Protocol::TCP),
)
.expect("Failed to create a socket");
socket.set_reuse_address(true)?;
socket.set_nonblocking(true)?;
socket.bind(&addr.into())?;
socket.listen(1024_i32)?;
let listener = std::net::TcpListener::from(socket);
Ok(Self {
listener: TcpListener::from_std(listener)?,
})
}
/// Returns the address that this [`IncomingStream`] is bound to.
pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
// The address we bound to may not be the same as the one we requested.
// This happens, for example, when binding to port 0—this will cause the OS to pick a random
// port for us which we won't know unless we call `local_addr` on the listener.
self.listener.local_addr()
}
/// Accepts a new incoming connection from the underlying listener.
///
/// This function will yield once a new TCP connection is established. When
/// established, the corresponding [`TcpStream`] and the remote peer's
/// address will be returned.
///
/// # Example
///
/// ```no_run
/// use pavex::server::IncomingStream;
/// use std::net::SocketAddr;
///
/// # async fn t() -> std::io::Result<()> {
/// let address = SocketAddr::from(([127, 0, 0, 1], 8080));
/// let incoming = IncomingStream::bind(address).await?;
///
/// match incoming.accept().await {
/// Ok((_socket, addr)) => println!("new client: {:?}", addr),
/// Err(e) => println!("couldn't get client: {:?}", e),
/// }
/// # Ok(())
/// # }
/// ```
pub async fn accept(&self) -> std::io::Result<(TcpStream, SocketAddr)> {
self.listener.accept().await
}
}
impl TryFrom<std::net::TcpListener> for IncomingStream {
type Error = std::io::Error;
fn try_from(v: std::net::TcpListener) -> std::io::Result<Self> {
// This is *very* important!
// `tokio` won't automatically set the socket to non-blocking mode for us, so we have to do
// it ourselves.
// Forgetting to set the socket to non-blocking mode will likely result in
// mysterious server hangs.
v.set_nonblocking(true)?;
Ok(Self {
listener: TcpListener::from_std(v)?,
})
}
}
impl From<TcpListener> for IncomingStream {
fn from(v: TcpListener) -> Self {
Self { listener: v }
}
}