Tokio I/O Patterns #
Contents
- TCP split stream
- Split generic
AsyncRead+AsyncWrite
stream - Bidirectional driver for I/O without split
- Framed I/O
- Bidirectional driver for framed I/O
Now let’s look at the possible solutions and weigh their pros and cons.
TCP split stream #
The first obvious solution, suggested by the documentation, is to split the TCP stream into two parts: a reader and a writer.
The Tokio TcpStream
API provides split()
and into_split()
. They differ in ownership and allocations. Because we stay in one Tokio task, the borrowing split()
variant is cheaper.
let mut stream = TcpStream::connect(&addr).await?;
let (mut r, mut w) = stream.split();
let reader = async {
loop {
let n = r.read(&mut read_buf).await?;
if n == 0 {
// EOF - server closed connection
eprintln!("Server closed the connection.");
break;
}
print!("{}", String::from_utf8_lossy(&read_buf[..n]));
}
Ok::<_, io::Error>(())
};
let writer = async {
while let Some(msg) = rx.recv().await {
w.write_all(&msg).await?;
w.flush().await?;
println!("client's written");
}
Ok::<_, io::Error>(())
};
tokio::try_join!(reader, writer)?;
Ok(())
The code creates two futures: reader and writer, and runs them concurrently with try_join!
.
Backpressure on writes occurs naturally in a blocking manner. Each message is first read from rx
channel, then written in its own child future, so the reader future itself remains unblocked by it.
Let’s add cancellation that works even under backpressure without contorting the existing futures.
We can leverage try_join!
’s short-circuiting by introducing a dedicated cancellation future. If the cancel future errors, try_join!
returns immediately and the other tasks stop:
tokio::try_join!(reader, writer, cancel)?;
where cancel
is:
let cancel = {
async move {
tokio::select! {
_ = cancel_token.cancelled() => Err(cancel_err),
_ = finish_rx.recv() => Ok(()),
}
}
};
and where:
cancel_token
– isTokio
CancellationToken
useful for signalling a cancellation request to one or more tasks.finish_rx
– is a way to exit thecancel
future without short-circuiting thetry_join!
.
The reader and the writer need to close (drop) their parts of the finish_tx
channel on exit (no matter on success or on error).
let (finish_tx, mut finish_rx) = mpsc::channel::<()>(1);
let _guard = finish_tx.clone();
let reader = async move {
let _guard = _guard;
loop {
let n = r.read(&mut read_buf).await?;
if n == 0 {
// EOF - server closed connection
eprintln!("Server closed the connection.");
break;
}
print!("{}", String::from_utf8_lossy(&read_buf[..n]));
}
Ok::<_, io::Error>(())
};
let _guard = finish_tx.clone();
let writer = async {
let _guard = _guard;
while let Some(msg) = rx.recv().await {
w.write_all(&msg).await?;
w.flush().await?;
}
Ok::<_, io::Error>(())
};
drop(finish_tx); // drop the origin
The full code could be found on github
The above code works, but before pushing it to prod, consider the following improvement and limitations:
- Think about Cancellation safety. It’s not a simple topic overall, but in our case the message dequeued from the
rx
channel might be lost during cancelation in backpressure situation. More info here and a lot of examples here. - On cancellation or write error – drain the reader buffer until an error or EOF. This can be useful to:
- Continue reading the response on a best-effort basis. For example, you might try to read the HTTP header from the buffer for debugging or to forward a truncated response.
- Reuse the underlying TCP/TLS connection.
- Trigger custom cleanup logic when cancellation occurs.
I leave this refinement to the reader.
Split generic AsyncRead+AsyncWrite
stream
#
The split approach works for TcpStream
and other basic types from tokio::io
. But the real applications ususally favor generics instead: most adapters accept AsyncRead+AsyncWrite
so they compose well. For instance:
async fn handle<IO: AsyncRead + AsyncWrite>(stream: IO) -> io::Result<()> {
...
}
You can still split such a stream with tokio::io::split()
. The example becomes:
async fn handle<IO: AsyncRead + AsyncWrite + Unpin>(
stream: IO,
mut rx: mpsc::Receiver<Bytes>,
) -> io::Result<()> {
let mut read_buf = vec![0u8; 8192];
let (mut r, mut w) = tokio::io::split(stream);
let reader = async {
loop {
let n = r.read(&mut read_buf).await?;
if n == 0 {
// EOF - server closed connection
eprintln!("Server closed the connection.");
break;
}
print!("{}", String::from_utf8_lossy(&read_buf[..n]));
}
Ok::<_, io::Error>(())
};
let writer = async {
while let Some(msg) = rx.recv().await {
w.write_all(&msg).await?;
w.flush().await?;
println!("client's written");
}
Ok::<_, io::Error>(())
};
tokio::try_join!(reader, writer)?;
Ok(())
}
It looks almost the same (omitting cancellation). But there is one caveat: tokio::io::split()
uses an internal mutex. However if we try to evaluate the possible contention of this mutex, we can see that, as long as both halves remain in the same task (i.e., you don’t spawn()
), contention is negligible and the overhead remains minimal.
The cancellation case mirrors the TCP-specific version.
Bidirectional driver for I/O without split #
One important caveat of tokio::io::split()
: splitting is not always safe for every stream implementation. The full story is covered in this GitHub issue, but the TL;DR is:
Some streams (
AsyncRead+AsyncWrite
), including TLS transports, do not keep reads and writes independent. A read may need to trigger a write (alerts, key updates, etc.), and vice versa.Because of that coupling, simply splitting into separate read and write tasks can lead to deadlocks, “stuck” futures, or missed wakeups (i.e. the read half might block waiting on a write event that never gets woken, etc.).
Even though this issue is specific to Tokio tasks and does not affect concurrent child futures (there is a single waker per task, so concurrent readers and writers are fine), you might still want to build a lock-free, fully controlled, and highly performant communication layer. In such cases, using io::split()
can be unnecessary, or even overkill. For example, the h2 HTTP/2 and the Hyper HTTP/1 crates do not rely on io::split()
at all, and instead, they adopt a connection-driver approach.
The connection-driver approach (sometimes called a connection task or connection future) is a pattern where instead of splitting AsyncRead/AsyncWrite
and work with its halves, the single Connection
future is created that drives the entire protocol state machine. It’s called a driver because polling it drives the connection forward.
Let’s write our own concurrent connection-driver.
First, remember that if we keep the stream’s read and write halves in one task, the I/O is multiplexed on readiness and run one after another. This simple model works well for I/O-bound workloads where the time between events is much longer than the processing time.
Second, we can’t directly use concurrent helpers because the stream implements both AsyncRead
and AsyncWrite
, and borrowing it twice would violate Rust’s rules. So instead, we need to implement a custom made future that owns the I/O, makes reads/writes manually, and proxies messages to/from an actor-like interface.
Tokio’s tokio::io::copy_bidirectional
is a great example: it copies between two streams by juggling buffers and states, preserving backpressure: no mutexes or helper macros are used, just raw async primitives.
Another example is the linkerd2
proxy’s duplex copy, which follows similar design ideas.
Thus let’s reproduce the logic for our own use case with one additional feature. So the recap of our driver is:
- The driver acts as an actor around an I/O stream.
- It should provide a channel to feed outbound messages and another channel to collect ready-to-consume reads.
- As a bonus, expose a write barrier command that guarantees all previous writes are flushed.
The user API:
let stream = TcpStream::connect("127.0.0.1:8080").await?;
let stream = BufStream::new(stream);
let (conn, tx, mut rx) = Connection::new(stream)?;
tokio::spawn(async move {
if let Err(err) = conn.await {
panic!("err: {err}");
}
});
tx.send(message).await.unwrap();
while let Some(buf) = rx.recv().await {
println!("recv from server: {:?}", buf);
}
where the message
is a tuple of a Bytes buffer and an optional callback – to trigger a flush call:
type Ack = oneshot::Sender<()>;
type Outgoing = (Bytes, Option<Ack>);
...
let (ack_tx, ack_rx) = oneshot::channel();
message: Outgoing = (Bytes::from_static(b"message"), Some(ack_tx));
tx.send(message).await.unwrap();
_ = ack_rx.await.unwrap(); // <--------- barrier
...
The full code can be found on github. But here I just want to show some important parts.
The main structure of the Connection
with buffers and states:
pub struct Connection<T> {
stream: T,
read_buf: BytesMut, // reusable buffer for read IO calls
// read from IO
inbound_buf: Option<Bytes>,
tx: PollSender<Bytes>,
// write to IO
outbound_buf: Option<Outgoing>,
rx: mpsc::Receiver<Outgoing>,
read_state: ReadState,
write_state: WriteState,
}
The core of the driver lies in its implementation of the Future
trait:
impl<T> Future for Connection<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
type Output = Result<(), io::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if matches!(self.read_state, ReadState::Reading) {
_ = self.poll_read(cx)?;
}
if matches!(self.write_state, WriteState::Writing) {
_ = self.poll_write(cx)?;
}
if matches!(self.write_state, WriteState::Flushing(_)) {
_ = self.poll_flush(cx)?;
}
if matches!(self.write_state, WriteState::ShuttingDown) {
_ = self.poll_shutdown(cx)?;
}
if matches!(self.read_state, ReadState::Done)
&& matches!(self.write_state, WriteState::Done)
{
return Poll::Ready(Ok(()));
}
Poll::Pending
}
}
The future sequentially checks readiness and performs reads and writes on every wake-up. If either side returns Poll::Pending
, the driver continues to the next step instead of yielding immediately. The first error stops the driver.
The write side runs in three phases: read from the channel of available to write buffers and write them to I/O stream, the flush, then the shutdown. The dedicated Flushing
state lets us implement write barriers: it forces an I/O flush and runs a callback once the flush completes.
Backpressure #
We need to apply backpressure to both sides of the driver communication:
- Read side only pulls from the socket when the upstream channel has capacity. If it’s full, reading stops and the kernel TCP window naturally pushes back on the peer.
- Write side only pulls the next message after the current one is fully written (and optionally flushed + acked). If the socket can’t accept bytes, writing yields with the remaining bytes kept in place.
Read backpressure: “don’t read unless you can forward” #
Key idea: buffer at most one message (inbound_buf
) until the mpsc
receiver tx
reserves a slot. If the channel is full, we yield and stop reading.
fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
// 1. If we already have a message to forward, try to reserve capacity upstream.
if self.inbound_buf.is_some() {
ready!(
self.tx
.poll_reserve(cx)
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Channel closed"))?
);
// Capacity is guaranteed now; forward the buffered chunk.
self.tx
.send_item(self.inbound_buf.take().expect("buffered message missing"))
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "Channel closed"))?;
}
// 2. Ensure read buffer has enough capacity, then read from the socket.
if self.read_buf.capacity() < READ_BUF_CAPACITY {
self.read_buf.reserve(READ_BUF_CAPACITY);
}
let n = ready!(tokio_util::io::poll_read_buf(
Pin::new(&mut self.stream),
cx,
&mut self.read_buf
))?;
// 3. EOF, finish reads & close upstream channel.
if n == 0 {
self.read_state = ReadState::Done;
self.tx.close();
return Poll::Ready(Ok(()));
}
// 4. Move the just-read bytes into a message to forward next loop iteration.
self.inbound_buf = Some(self.read_buf.split().freeze());
}
}
Write backpressure: “finish what you started, then maybe flush & ack” #
Key idea: keep the current (Bytes, Option<Ack>)
in outbound_buf
. Don’t pull the next message until this one is completely written (and optionally flushed + acked). If the socket can’t take bytes now, yield with progress saved.
fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
loop {
// 1. If nothing buffered, try to pull from the bounded mpsc.
if self.outbound_buf.is_none() {
match self.rx.poll_recv(cx) {
Poll::Ready(Some(msg)) => self.outbound_buf = Some(msg),
Poll::Ready(None) => { // sender closed, finish by flushing then shutdown
self.write_state.set_flushing(None, FromFlushingTo::ShuttingDown);
return Poll::Ready(Ok(()));
}
Poll::Pending => { // nothing to send right now, make sure we flush kernel/TLS buffers
self.write_state.set_flushing(None, FromFlushingTo::Writing);
return Poll::Pending;
}
}
}
// 2. Drain the current buffer into the socket, handling partial writes.
if let Some((buf, ack)) = self.outbound_buf.as_mut() {
while !buf.is_empty() {
match ready!(Pin::new(&mut self.stream).poll_write(cx, buf)?) {
0 => return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, "write zero bytes"))),
n => buf.advance(n),
}
}
// 3. Optional message-level completion: flush, then send ack.
if let Some(ack) = ack.take() {
self.write_state.set_flushing(Some(ack), FromFlushingTo::Writing);
cx.waker().wake_by_ref(); // the Poll::Pending is returning without installed waker, so wake manually
return Poll::Pending;
}
// 4. Fully written try the next message.
self.outbound_buf = None;
}
}
}
Cancellation #
A short-circuit cancellation is pretty straightforward in this case:
let cancel = CancellationToken::new();
tokio::spawn(async move {
tokio::select! {
Err(err) = conn => panic!("err: {err}"),
_ = cancel.cancelled() => return,
else => return
}
});
Improvements #
There are several improvements to consider and play with:
- Once again about cancellation safety: understand how critical it is for you to lose buffered data.
- Improve read-buffer allocation and copying; you can tune strategies based on expected frame sizes.
- Randomize the order of read and write calls to avoid starving one side. A simple round-robin toggle is often enough.
- On write errors, drain reads before returning – just like the earlier split example.
- The channel logic uses a single buffer:
- Try to use
poll_recv_many()
to get more at once instead ofpoll_recv()
. - Try to use
try_reserve_many()
in the sender part. - Or as an alternative for the above, change the channel to work with multiple buffers instead of one:
- Try to use
let (tx, rx) = mpsc::channel::<(Vec<Bytes>, Ack)>(128)
Framed I/O #
So far we’ve passed raw Bytes
through the I/O. But usually it’s not what we do in real apps where we work with structures. In practice, streams decode bytes into domain-specific frames/messages/structures and encode frames back to bytes. The encoder/decoder pair is usually called a codec. Tokio provides helpful traits and utilities in tokio_util::codec
crate to build composable framed APIs.
We can wrap an I/O stream with Framed::new()
. If the I/O implements AsyncRead+AsyncWrite
, the resulting framed stream also offers split()
(from StreamExt
) because it implements both Stream
and Sink
. Internally this behaves like io::split
, but uses a specialized BiLock
optimized for two owners mutex lock.
Here’s how it fits together when we use length prefix codec for meesages:
async fn handle<IO: AsyncRead + AsyncWrite + Unpin>(
stream: IO,
rx: mpsc::Receiver<Bytes>,
) -> io::Result<()> {
let stream = Framed::new(stream, LengthDelimitedCodec::new());
// https://docs.rs/futures/latest/futures/lock/struct.BiLock.html
let (writer, reader) = stream.split();
let reader = reader.try_for_each(|msg| async move {
print!("{}", String::from_utf8_lossy(&msg));
Ok(())
});
let writer = ReceiverStream::new(rx).map(Ok).forward(writer);
tokio::try_join!(reader, writer)?;
Ok(())
}
This mirrors the earlier io::split()
example but gains the convenience of stream adapters.
Look how elegant, composable, and compatible the code looks.
The full code can be found on github.
3.5 Bidirectional driver for framed I/O #
Finally, we can upgrade the bytes-based driver to a framed one.
The client API changed a bit:
let (conn, tx, mut rx) = ConnectionFramed::new(stream, LengthDelimitedCodec::new())?;
tokio::spawn(async move {
if let Err(err) = conn.await {
panic!("err: {err}");
}
});
It is almost identical to the bytes driver, except that it works with encoder/decoder generics and generic inbound/outbound types.
#[pin_project]
pub struct ConnectionFramed<T, C, In, Out>
where
T: AsyncRead + AsyncWrite + Unpin,
C: Decoder<Item = In, Error = io::Error> + Encoder<Out, Error = io::Error>,
In: Send + fmt::Debug,
Out: Send + fmt::Debug,
{
#[pin]
stream: Framed<T, C>,
// read from IO
inbound_buf: Option<In>,
tx: PollSender<In>,
// write to IO
outbound_buf: Option<Outgoing<Out>>,
rx: mpsc::Receiver<Outgoing<Out>>,
read_state: ReadState,
write_state: WriteState,
}
Everything else follows the same pattern.
The full code you can find here
Putting it together #
Tokio offers several levels of control. Splitting a stream is often sufficient for simple TCP code, especially when paired with try_join!
for simple cancellation. Generic I/O benefits from the same pattern via tokio::io::split
, while custom drivers provide fine-grained control when splitting is unsafe or when you need customization and additional control. Choose the lightest abstraction that preserves backpressure and cancellation for your use case, and reach for a dedicated driver only when you need the extra guarantees.