I/O loop #
Contents
Before changing the code, let’s outline our goals for I/O loops:
- Preserve backpressure propagation: blocked writes should slow down the local producer over the
rx
channel. - Retain concurrency: other work (e.g., cancellation, timeouts) should make progress while reads/writes are pending. Reads and writes should not block each other.
Backpressure propagation #
To address the first issue we could flip the perspective: instead of eagerly pulling from the rx
channel, wait until the writer proves it has capacity. Some async primitives support this. For example, tokio::sync::mpsc::Sender
offers reserve()
, which await
s for a slot to become available to write. When the permission is granted, we can dequeue the message from our rx
channel. But one important note here is that mpsc
is slot-based, and not byte-sized, which is not what we usually need when working with I/O streams.
Tokio’s TCP stream provides something closer to the reserve()
– the writable()
method, which uses epoll
and EPOLLOUT
internally to notify when the socket can accept writes.
You can reasonably ask: how much capacity does “writable” guarantee? On Linux, there is SO_SNDLOWAT
, and it is effectively fixed at one byte for TCP and cannot be tuned, so readiness alone isn’t a full solution.
You can for sure build I/O loops using this signal, but it’s usually not optimal because:
- As you can see from the documentation and the example, the writability doesn’t always mean that the send finishes successfully.
- Readiness doesn’t provide an available size, which could be less than the size of the encoded message to send. Additional logic is required to handle partial writes. There is a way to query this size, but it’s a syscall which is expensive to make.
- The API is TCP-specific and will not work out of the box for a generic Tokio stream I/O, which is typically bound to the AsyncRead+AsyncWrite traits.
Another idea is to use a smart write buffer that wraps around the stream I/O writer. It should be able to report when it’s ready to write, and expose an async readiness-like call you can await
. Its “ready to write” signal effectively triggers a data flush and propagates backpressure. This is exactly how the Sink trait is designed, with its poll_ready() and start_send() methods.
Tokio already provides several implementations of this pattern, some of which we’ll use later when discussing Framed I/O.
Another approach (which we are not going to use) is to handle backpressure on the consumer’s side. In this model, the consumer explicitly requests N records or bytes by pulling them from the producer. For example, the Java Apple ServiceTalk framework uses this pattern. Another example is HTTP/2 streams, where the consumer dictates how much data it wants to receive with WINDOW_UPDATE frames. However, this approach is not available for plain TCP streams or generic I/O streams, so we will not cover it further.
Concurrency #
The concurrency feature might look like low-hanging fruit since we’re already using the Tokio framework. But the reality is harsher. Rust’s ownership model and borrow checker prevent us from writing a simple, naïve fix:
loop {
tokio::select! {
Some(msg) = rx.recv() => {
tokio::select! { // < --------- change
res = stream.write_all(&msg) => {...},
res = stream.read(&mut read_buf) => {...},
}
stream.flush().await?;
println!("client's written");
}
res = stream.read(&mut read_buf) => {
let n = res?;
if n == 0 {
// EOF - server closed connection
eprintln!("Server closed the connection.");
break; //exit
}
print!("{}", String::from_utf8_lossy(&read_buf[..n]));
}
else => break
}
}
The compiler error:
cannot borrow stream as mutable more than once at a time second mutable borrow occurs here (rustc E0499)
The AsyncRead
and AsyncWrite
trait methods require a mutable reference to self
(more precisely, Pin&<&mut Self>
, though Pin
/Unpin
is out of scope for this post). We can’t place both operations in select!
expressions and await them concurrently, because that would require borrowing the same value mutably twice. So we need a different approach that satisfies the Rust compiler.
An additional interesting question is what concurrency actually means for our network application.
A high-load network application, such as a web server or a network proxy, that must handle thousands of simultaneous connections (the C10k problem) has not many practical options and usully use an event loop (for example, epoll
on Linux).
Note:
There is also an
io_uring
approach, but the current Tokio version offers only limited support for it.
As a rule of thumb, there is typically one event loop per CPU core, and the application maps sockets to these threads according to its scheduling strategy.
The implications of such a design are:
- Parallelism is bounded by the number of
epoll
threads. True parallel execution cannot exceed the number of event-loop threads available. - Developers must decide how to handle socket I/O. The read and write halves can run concurrently on the same thread, or in true parallel fashion, where two independent threads operate on the same socket.
The answer to the latter is not obvious. While parallel execution may appear superior, it introduces several overheads and limitations:
- Cross-thread synchronization overhead at the system level
- Asynchronous task-scheduler overhead (e.g., the Tokio runtime)
- CPU cache effects, including misses and cache-line contention
For an I/O-intensive application, it is often more efficient to keep both halves of a socket within a single scheduled task (thread).
This is the default approach recommended by both the Rust Tokio runtime (as we saw earlier with the select!
macro) and Go.
In practice, stream I/O is typically handled inside a single actor, which then communicates with other parts of the application through asynchronous channels or other low-overhead synchronization primitives.
Let’s take a quick look at some concurrency primitives Tokio provides:
Note:
futures::stream::FuturesUnordered
can be very useful too, but it’s out of scope here.
Key distinction: spawn()
launches a future on the runtime’s thread pool (Tokio task). The other primitives run futures in the current Tokio task concurrently.
One more thing to note before we start is that try_join!
, like all try_
-prefixed macros, operates on a Result
and returns as soon as the first error occurs, signalling the remaining child futures to stop. This behavior is useful for implementing short-circuit logic.