pavex/server/incoming.rs
1use std::net::SocketAddr;
2
3use socket2::Domain;
4use tokio::net::{TcpListener, TcpStream};
5
6/// A stream of incoming connections.
7///
8/// [`IncomingStream::bind`] is the primary entrypoint for constructing a new [`IncomingStream`].
9///
10/// Incoming connections will be usually passed to a [`Server`](super::Server) instance to be handled.
11/// Check out [`Server::bind`](super::Server::bind) or
12/// [`Server::listen`](super::Server::listen) for more information.
13pub struct IncomingStream {
14 listener: TcpListener,
15}
16
17impl IncomingStream {
18 /// Create a new [`IncomingStream`] by binding to a socket address.
19 /// The socket will be configured to be non-blocking and reuse the address.
20 ///
21 /// # Example
22 ///
23 /// ```rust
24 /// use std::net::SocketAddr;
25 /// use pavex::server::{IncomingStream, Server};
26 ///
27 /// # async fn t() -> std::io::Result<()> {
28 /// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
29 /// let incoming = IncomingStream::bind(addr).await?;
30 /// # Ok(())
31 /// # }
32 /// ````
33 ///
34 /// # Custom configuration
35 ///
36 /// If you want to customize the options set on the socket, you can build your own
37 /// [`TcpListener`](std::net::TcpListener) using [`socket2::Socket`] and then convert it
38 /// into an [`IncomingStream`] via [`TryFrom::try_from`].
39 /// There's only one option you can't change: the socket will always be set to non-blocking
40 /// mode.
41 ///
42 /// ```rust
43 /// use std::net::SocketAddr;
44 /// use socket2::Domain;
45 /// use pavex::server::{IncomingStream, Server};
46 ///
47 /// # async fn t() -> std::io::Result<()> {
48 /// let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
49 ///
50 /// let socket = socket2::Socket::new(
51 /// Domain::for_address(addr),
52 /// socket2::Type::STREAM,
53 /// Some(socket2::Protocol::TCP),
54 /// )
55 /// .expect("Failed to create a socket");
56 /// socket.set_reuse_address(true)?;
57 /// socket.set_nonblocking(true)?;
58 /// socket.bind(&addr.into())?;
59 /// // We customize the backlog setting!
60 /// socket.listen(2048_i32)?;
61 ///
62 /// let listener = std::net::TcpListener::from(socket);
63 /// let incoming: IncomingStream = listener.try_into()?;
64 /// # Ok(())
65 /// # }
66 /// ````
67 pub async fn bind(addr: SocketAddr) -> std::io::Result<Self> {
68 let socket = socket2::Socket::new(
69 Domain::for_address(addr),
70 socket2::Type::STREAM,
71 Some(socket2::Protocol::TCP),
72 )
73 .expect("Failed to create a socket");
74
75 socket.set_reuse_address(true)?;
76 socket.set_nonblocking(true)?;
77 socket.bind(&addr.into())?;
78 socket.listen(1024_i32)?;
79
80 let listener = std::net::TcpListener::from(socket);
81 Ok(Self {
82 listener: TcpListener::from_std(listener)?,
83 })
84 }
85
86 /// Returns the address that this [`IncomingStream`] is bound to.
87 pub fn local_addr(&self) -> std::io::Result<SocketAddr> {
88 // The address we bound to may not be the same as the one we requested.
89 // This happens, for example, when binding to port 0—this will cause the OS to pick a random
90 // port for us which we won't know unless we call `local_addr` on the listener.
91 self.listener.local_addr()
92 }
93
94 /// Accepts a new incoming connection from the underlying listener.
95 ///
96 /// This function will yield once a new TCP connection is established. When
97 /// established, the corresponding [`TcpStream`] and the remote peer's
98 /// address will be returned.
99 ///
100 /// # Example
101 ///
102 /// ```no_run
103 /// use pavex::server::IncomingStream;
104 /// use std::net::SocketAddr;
105 ///
106 /// # async fn t() -> std::io::Result<()> {
107 /// let address = SocketAddr::from(([127, 0, 0, 1], 8080));
108 /// let incoming = IncomingStream::bind(address).await?;
109 ///
110 /// match incoming.accept().await {
111 /// Ok((_socket, addr)) => println!("new client: {:?}", addr),
112 /// Err(e) => println!("couldn't get client: {:?}", e),
113 /// }
114 /// # Ok(())
115 /// # }
116 /// ```
117 pub async fn accept(&self) -> std::io::Result<(TcpStream, SocketAddr)> {
118 self.listener.accept().await
119 }
120}
121
122impl TryFrom<std::net::TcpListener> for IncomingStream {
123 type Error = std::io::Error;
124
125 fn try_from(v: std::net::TcpListener) -> std::io::Result<Self> {
126 // This is *very* important!
127 // `tokio` won't automatically set the socket to non-blocking mode for us, so we have to do
128 // it ourselves.
129 // Forgetting to set the socket to non-blocking mode will likely result in
130 // mysterious server hangs.
131 v.set_nonblocking(true)?;
132 Ok(Self {
133 listener: TcpListener::from_std(v)?,
134 })
135 }
136}
137
138impl From<TcpListener> for IncomingStream {
139 fn from(v: TcpListener) -> Self {
140 Self { listener: v }
141 }
142}