pavex/server/
incoming.rs

1use std::net::SocketAddr;
2
3use socket2::Domain;
4use tokio::net::{TcpListener, TcpStream};
5
6/// A stream of incoming connections.  
7///
8/// [`IncomingStream::bind`] is the primary entrypoint for constructing a new [`IncomingStream`].
9///
10/// Incoming connections will be usually passed to a [`Server`](super::Server) instance to be handled.
11/// Check out [`Server::bind`](super::Server::bind) or
12/// [`Server::listen`](super::Server::listen) for more information.
13pub struct IncomingStream {
14    listener: TcpListener,
15}
16
17impl IncomingStream {
18    /// Create a new [`IncomingStream`] by binding to a socket address.  
19    /// The socket will be configured to be non-blocking and reuse the address.
20    ///
21    /// # Example
22    ///
23    /// ```rust
24    /// use std::net::SocketAddr;
25    /// use pavex::server::{IncomingStream, Server};
26    ///
27    /// # async fn t() -> std::io::Result<()> {
28    /// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
29    /// let incoming = IncomingStream::bind(addr).await?;
30    /// # Ok(())
31    /// # }
32    /// ````
33    ///
34    /// # Custom configuration
35    ///
36    /// If you want to customize the options set on the socket, you can build your own
37    /// [`TcpListener`](std::net::TcpListener) using [`socket2::Socket`] and then convert it
38    /// into an [`IncomingStream`] via [`TryFrom::try_from`].  
39    /// There's only one option you can't change: the socket will always be set to non-blocking
40    /// mode.
41    ///
42    /// ```rust
43    /// use std::net::SocketAddr;
44    /// use socket2::Domain;
45    /// use pavex::server::{IncomingStream, Server};
46    ///
47    /// # async fn t() -> std::io::Result<()> {
48    /// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
49    ///
50    /// let socket = socket2::Socket::new(
51    ///    Domain::for_address(addr),
52    ///    socket2::Type::STREAM,
53    ///    Some(socket2::Protocol::TCP),
54    /// )
55    /// .expect("Failed to create a socket");
56    /// socket.set_reuse_address(true)?;
57    /// socket.set_nonblocking(true)?;
58    /// socket.bind(&addr.into())?;
59    /// // We customize the backlog setting!
60    /// socket.listen(2048_i32)?;
61    ///
62    /// let listener = std::net::TcpListener::from(socket);
63    /// let incoming: IncomingStream = listener.try_into()?;
64    /// # Ok(())
65    /// # }
66    /// ````
67    pub async fn bind(addr: SocketAddr) -> std::io::Result<Self> {
68        let socket = socket2::Socket::new(
69            Domain::for_address(addr),
70            socket2::Type::STREAM,
71            Some(socket2::Protocol::TCP),
72        )
73        .expect("Failed to create a socket");
74
75        socket.set_reuse_address(true)?;
76        socket.set_nonblocking(true)?;
77        socket.bind(&addr.into())?;
78        socket.listen(1024_i32)?;
79
80        let listener = std::net::TcpListener::from(socket);
81        Ok(Self {
82            listener: TcpListener::from_std(listener)?,
83        })
84    }
85
86    /// Returns the address that this [`IncomingStream`] is bound to.
87    pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
88        // The address we bound to may not be the same as the one we requested.
89        // This happens, for example, when binding to port 0—this will cause the OS to pick a random
90        // port for us which we won't know unless we call `local_addr` on the listener.
91        self.listener.local_addr()
92    }
93
94    /// Accepts a new incoming connection from the underlying listener.
95    ///
96    /// This function will yield once a new TCP connection is established. When
97    /// established, the corresponding [`TcpStream`] and the remote peer's
98    /// address will be returned.
99    ///
100    /// # Example
101    ///
102    /// ```no_run
103    /// use pavex::server::IncomingStream;
104    /// use std::net::SocketAddr;
105    ///
106    /// # async fn t() -> std::io::Result<()> {
107    /// let address = SocketAddr::from(([127, 0, 0, 1], 8080));
108    /// let incoming = IncomingStream::bind(address).await?;
109    ///
110    /// match incoming.accept().await {
111    ///     Ok((_socket, addr)) => println!("new client: {:?}", addr),
112    ///     Err(e) => println!("couldn't get client: {:?}", e),
113    /// }
114    /// # Ok(())
115    /// # }
116    /// ```
117    pub async fn accept(&self) -> std::io::Result<(TcpStream, SocketAddr)> {
118        self.listener.accept().await
119    }
120}
121
122impl TryFrom<std::net::TcpListener> for IncomingStream {
123    type Error = std::io::Error;
124
125    fn try_from(v: std::net::TcpListener) -> std::io::Result<Self> {
126        // This is *very* important!
127        // `tokio` won't automatically set the socket to non-blocking mode for us, so we have to do
128        // it ourselves.
129        // Forgetting to set the socket to non-blocking mode will likely result in
130        // mysterious server hangs.
131        v.set_nonblocking(true)?;
132        Ok(Self {
133            listener: TcpListener::from_std(v)?,
134        })
135    }
136}
137
138impl From<TcpListener> for IncomingStream {
139    fn from(v: TcpListener) -> Self {
140        Self { listener: v }
141    }
142}