Pipes

Pipes #

The pipe is a neat feature of the Linux kernel that allows us to build one-directional communication channels between related processes (often a parent and a child).

Pipes are usually well known from shells, where we use “|” symbol to build command pipelines. But first of all, the pipe is a system call, or actually, there are 2 of them: pipe() and pipe2() (man 2 pipe).

You can think of a pipe as a memory buffer with a byte stream API. Thus, by default, there are no messages or strict boundaries. The situation has changed since the Linux kernel 3.4 where the O_DIRECT flag and the packet mode were introduced. We will touch all variant of working with pipes in this chapter.

Another important feature of pipes is the max size of an atomic write. The PIPE_BUF constant (man 7 pipe) determines it and sets it to 4096 bytes. Please, read the man carefully if you want to rely on this guarantee.

As a result of a streaming nature, a reader and a writer can use completely different user-space buffer sizes if they want. All written bytes are read sequentially, so making the lseek() syscall for a pipe is impossible.

The pipes also provide a convenient notification API for both ends. The write calls to a pipe block if the internal kernel buffer is full. The writer will block or return EAGAIN (if it’s in nonblocking mode) until sufficient data has been read from the pipe to allow the writer to complete. On the other hand, if all pipe readers close their read file descriptors, the writer will get the SIGPIPE signal from the kernel, and all subsequent write() calls will return the EPIPE error.

From a reader’s perspective, a pipe can return a zero size read (end-of-file, EOF) if all writers close all their write pipe file descriptors. A reader blocks if there is nothing to read until data is available (you can change this by opening a pipe in the nonblocking mode).

Using a pipe to connect 2 processes
Image 2. – Using a pipe to connect 2 processes

Pipes are widely used in shells. The elegance of such an approach is that processes don’t have to know that they use pipes. They continue working with their standard file descriptors (stdin, stdout and stderr) as usual. Developers also don’t need to make any changes in their program’s source code in order to support this concept. It makes the process of connecting 2 programs composite, flexible, fast and reliable. Of course, in order to support such communication, shells have to do some additional work before spawning new commands (more details and examples below).

Internally, a pipe buffer is a ring buffer with slots. Each slot has a size of a PIPE_BUF constant. The number of slots is variable, and the default number is 16. So, if we multiply 16 by 4KiB, we can get a default size of 64KiB for a pipe buffer.

We can control the capacity of a pipe by calling the fcntl() with the F_SETPIPE_SZ flag. A pipe’s system max size limit can be found in the /proc/sys/fs/pipe-max-size (man 7 pipe).

We can get the size of unread bytes in a pipe by calling ioctl() with FIONREAD operation. We’ll write an example later.

The usual question about pipes is, do we really need them? Can we use regular files instead? There are several issues and lacking of API with using files instead of pipes:

  • There is no easy way to notify a writer that a reader has stopped reading.
  • For a reader, we can set up inotify (man 7 inotify) to efficiently track whether new changes appear.
  • Also, regular files don’t have nonblocking API (this is changing with io_uring, but still, it’s much harder to use it in comparison with the pipe() syscall).

One final introduction remark is that a pipe can be used by more than 2 processes. It’s possible to have multiple writers and readers for a single pipe. It’s not usuall because of the streaming nature of pipes and no clear boundaries by default, but with the new packet mode, it’s become more useful in some situations.

How shells internally create pipes #

With shells we usually use pipes to connect the stdout and/or the stderr of a process and stdin of another process. For example:

stdout to stdin:

$ command1 | command2

stdout and stderr to stdin:

$ command1 |& command2

or

command1 2>&1 | command2

So let’s understand how shells connect the following 2 commands internally.

$ ls -la | wc -l

As we already know, a shell process has three special standard open file descriptors. Thus, all its children inherit them by default because of the fork() syscalls. The following simple program shows how a shell can create a pipe and connect 2 programs. It creates a pipe in the parent process, then makes a fork() call twice in order to run execve() for the ls and wc binaries. Before the execve() calls, the children duplicate the needed standart fd with one of the ends of the pipe.

import sys
import os

r, w = os.pipe()

ls_pid = os.fork()
if not ls_pid:
   # child
   os.close(r)
   os.dup2(w, sys.stdout.fileno())
   os.close(w)
   os.execve("/bin/ls", ["/bin/ls", "-la", ], os.environ)


wc_pid = os.fork()
if not wc_pid:
   # child
   os.close(w)
   os.dup2(r, sys.stdin.fileno())
   os.close(r)
   os.execve("/usr/bin/wc", ["/usr/bin/wc", "-l"], os.environ)


os.close(r)
os.close(w)

for i in range(2) :
   pid, status = os.waitpid(-1, 0)

And if we run it:

$ python3 ./simple_pipe.py
12

The one important note about the above code is how I close all not needed file descriptors of the pipe. We have to close them to allow the kernel to send us correct signals, block operations, and return EOF when there are no more writers.

Pipe and write buffer #

Modern programming languages (for example, python) often buffer all their writes in memory before the actual write syscall executes. The main idea of such buffering is to get better I/O performance. It’s cheaper to make one big write() call than several smaller ones. There are 2 types of buffers that are widely used:

  • Block buffer
    For example, if its size is 4KiB, the buffer will write its content (flush) to the underlying fd only when it fills up completely or the explicit flush() call is invoked.
  • Line buffer
    This buffer type flushes its content when the new line character write occurs to the buffer.

The python (and other programming languages) changes the buffer type depending on the type of the underlying file descriptor. If the fd is a terminal, the buffer will be a line buffer. That makes sense because when we are in the interactive shell, we want to get the output as soon as possible. However, a block buffer will be used for pipes and regular files because it’s usually OK to postpone the flush for better performance.

The libc function isatty() (man 3 isatty) tests whether a file descriptor refers to a terminal.

Let’s demonstrate this behavior with 2 scripts connected by a pipe. The first one will print 10 lines to stdout, and the other one will consumethese lines from its stdin.

The printing script print.py:

import time

for i in range(4):
   print(f"{i}")
   time.sleep(0.1)

And the consumer script: stdin.py

import fileinput

for i, line in enumerate(fileinput.input()):
   print(f"{i+1}: {line.rstrip()}")

If you run the print.py, you should see how the output will be printed in a line-by-line manner:

$ python3 ./print.py
0
1
2
3

Now, if we run these 2 scripts with a pipe, you should see that the output freezes for a second, and it prints all lines at once afterwards:

$ python3 ./print.py | python stdin.py
1: 0
2: 1
3: 2
4: 3

Now let’s make it smoother. We need to add a flush() call after each print() in the print.py:

import time
import sys

for i in range(4):
   print(f"{i}")
   sys.stdout.flush()
   time.sleep(0.1)

And rerun it. Now, you should be able to see that the lines appear smoothly one-by-one:

$ python3 ./print.py | python stdin.py
1: 0
2: 1
3: 2
4: 3

It’s worth knowing that some core utilities have an option to control their buffering. For example, the grep can be forced to use a per line buffer with the --line-buffered option. Of course, this will give you a more interactive experience with some performance penalties. You can play with it and compare the outputs:

$ strings /var/tmp/file1.db | grep --line-buffered -E "^sek" | cat
sek.^ \
sekA
sekt
$ strings /var/tmp/file1.db | grep  -E "^sek" | cat
sek.^ \
sekA
sekt

SIGPIPE signal #

One of the exciting aspects of the pipes is their notification and synchronization features.

We intentionally closed all unused fd in the above code with fork() and execve() calls. The reason for doing that was not only our desire to save file descriptors and write a cleaner code but also to support the pipe notification features.

If all readers close their fd of the pipe and a writer tries to send data into it, the writer process will get the SIGPIPE signal from the kernel. This is a brilliant idea. Let’s assume we want to grep a huge nginx access log (for example, 500GiB) in order to find a target string and care only about the first three results:

$ cat /var/log/nginx/access.log | grep some_string | head -3

So, if we assume that the log file has all three target lines somewhere at the beginning of the file, the head command will exit almost immediately. Thus we don’t need to continue reading the file. As so, when the head util exits, it closes all its fd, including the stdin (which is a pipe). The subsequent writes from the grep will cause the kernel to send the SIGPIPE signal to it. The default handler for the SIGPIPE signal is to terminate, so grep will exit and close all its fd, including its stdin. And in its turn, the cat command will exit after receiving its own SIGPIPE signal. So the exit of the head starts the cascading exit of the whole shell pipeline.

A shell are usually waiting on the processes with the waitpid() syscall and collects all return codes. When it sees that all the process pipeline has finished, the shell sets the exit status variable $? to the returned code of the last command in the pipeline (head in our case) and populates the $PIPESTATUS (bash) or $pipestatus (zsh) array variable with all return codes of the piplene.

Let me demonstrate it. As you can see, all the above works without any support in the cat, grep or head tools. It’s the beauty of the pipes and shells collaboration.

Now we are ready to write our own prove of the above:

import signal
import os
import sys

def signal_handler(signum, frame):
	print(f"[pipe] signal number: {signum}", file=sys.stderr)
	os._exit(signum)

signal.signal(signal.SIGPIPE, signal_handler)

for i in range(9999):
	print(f"{i}")

And run it:

$ python3 ./print.py | head -3
0
1
2
[pipe] signal number: 13 <--------------
$ echo ${PIPESTATUS[@]}
13 0

$pipestatus, $? and pipefail #

We are ready to take a bit closer look at the exit statuses of a bash pipeline. By default, the last command in the pipe is used for the $? variable, which could sometimes lead to unexpected results. For instance:

$ echo 'some text' | grep no_such_text | cut -f 1
$ echo $?
0
$ echo 'some text' | grep no_such_text | cut -f 1
$ echo ${PIPESTATUS[@]}
0 1 0

But fortunately, we can change this behavior with a pipefail bash option:

$ set -o pipefail
$ echo 'some text' | grep no_such_text | cut -f 1
$ echo $?
1

FIFO or Named pipes #

So far, we have been talking about pipes in the context of related processes (a parent and its children), but we also have the option to share a pipe easily among any number of unrelated processes. We can create a disk reference for a pipe which is called a named pipe or a FIFO file.

There is one high-level man 7 fifo and a tool to create a fifo file mkfifo (man 1 mkfifo)).

The permission control is based on regular file permissions. So, if a process has “write” permissions, it can write to this named pipe.

All other aspects are identical to a regular pipe. The kernel internally creates the same pipe object and doesn’t store any data on disk.

The FIFO file could be helpful when you need to build a connection between completely unrelated programs or daemons without changing their source code.

pv tool #

pv or pipe viewer (man 1 pv) is a nifty tool to work with pipes and file descriptors. We can insert it in any pipeline place, and it will show additional info such as ETA, write rate and amount of transferred data with an incredible visual progress bar.

Here is the basic usage with a file shows us how fast the reading strings command can consume a file:

$ pv /var/tmp/file1.db | strings > /dev/null
 100MiB 0:00:01 [67.9MiB/s] [===========================================>] 100%

It also can rate limit a pipe, which is really useful for tests:

$ cat /var/tmp/file1.db | pv --rate-limit=1K | strings

Another neat feature is monitoring a process’s progress for every open file descriptor. Under the hood it uses procfs and fdinfo folders to get the positions for all opened files:

$ pv -d 6864
   3:/var/tmp/file1.db:  234 B 0:00:01 [0.00 B/s]   [>                  	]  0% ETA 0:00:00
 123:/var/tmp/file1.db:  234 B 0:00:01 [0.00 B/s] [>                  	]  0% ETA 0:00:00

Pipe usage #

We can get the size of unread bytes in a pipe by calling ioctl() with FIONREAD and a pipe fd. But how to get pipe usage from an unrelated process that doesn’t have the pipe file descriptor, for instance from a monitoring tool. Or, for example, we started a long running pipeline and not sure if the consumer of the pipe is reading data:

$ dd if=/dev/urandom | strings > /dev/null

We can, of course, use strace and check the read() syscalls in its output, but the reader could a read() syscall with a huge buffer that we can miss in the strace output.

So, in order to achieve the goal, we need to get the pipe file descriptor somehow. The most elegant solution (but not without drawbacks) is to steal the fd with sys_pidfd_getfd() system call and then use ioctl to get usage information.

The code can be something like the following:

package main

import (
   "fmt"
   "os"
   "strconv"
   "syscall"

   "golang.org/x/sys/unix"
)

const (
   sys_pidfd_open  = 434
   sys_pidfd_getfd = 438
   FIONREAD        = 0x541B
)

func pidfd_open(pid int) (int, error) {
   r1, _, err := syscall.Syscall(sys_pidfd_open, uintptr(pid), 0, 0)
   if err != 0 {
       return -1, err
   }
   return int(r1), nil
}

func pidfd_getfd(pidfd, targetfd int) (int, error) {
   r1, _, err := syscall.Syscall(sys_pidfd_getfd, uintptr(pidfd), uintptr(targetfd), 0)
   if err != 0 {
       return -1, err
   }
   return int(r1), nil
}

func main() {
   var (
       pid, fd int
       err     error
   )

   pid, err = strconv.Atoi(os.Args[1])
   fd, err = strconv.Atoi(os.Args[2])
   if err != nil {
       panic(err)
   }

   pidfd, err := pidfd_open(pid)

   if err != nil {
       panic(err)
   }

   newFd, err := pidfd_getfd(pidfd, fd)
   if err != nil {
       panic(err)
   }

   for {
       size, err := unix.IoctlGetInt(newFd, FIONREAD)
       if err != nil {
           panic(err)
       }
       fmt.Printf("size:\t%d\n", size)
   }
}

Run our target pipeline:

$ dd if=/dev/urandom | pv --rate-limit 30K | strings > /dev/null
^ KiB 0:00:16 [27.6KiB/s]

And run our tool:

$ sudo go run ./pipe_capacity.go 19990 1
size:   62464
size:   62464
size:   63488
size:   63488
size:   63488
size:   63488
size:   63488

The main drawback of such a technique is that we are holding the write end of the pipe open. This can lead to an extended life of the reader because it will block on an empty pipe instead of getting EOF (see the pipe notification feature explained above).

Packets pipe mode (O_DIRECT) #

Since the Linux kernel 3.4, a pipe can be created with the O_DIRECT flag. It puts it into packet mode. From my point of view, this mode can be successfully used only with writes and reads that are less or equal to the PIPE_BUF size (4KiB) because atomicity is guaranteed only in this case.

The packet mode is different from the default stream mode in the following ways:

  • The kernel doesn’t try to merge writes into one ring buffer slot here and here. It, of course, leads to the underutilization of the pipe buffer, but provides guarantee of boundaries instead.
  • Readers with read() of PIPE_BUF size get the same messages as the writers wrote;
  • If the reader’s buffer is less than the data in the slot (some misconfigured reader infiltrated), then the remaining data is discarded to protect the boundaries of messages.

Now let’s write an example with 2 writers and 2 readers. Every writer writes less than 4KiB, so readers will get one full message on every read:

import sys
import os
import time


PIPE_BUF = 4096

print(f"supervisor: {os.getpid()}", file=sys.stderr)

r, w = os.pipe2(os.O_DIRECT)

# fork 2 writers
for instance in range(2):
   writer_pid = os.fork()
   if not writer_pid:
       print(f"writer{instance}: {os.getpid()}", file=sys.stderr)

       os.close(r)
       pid = os.getpid()
       for i in range(100):
           os.write(w, f"writer{instance}: {i}".encode())
           time.sleep(1)

# fork 2 readers
for instance in range(2):
   reader_pid = os.fork()
   if not reader_pid:
       print(f"reader{instance}: {os.getpid()}", file=sys.stderr)

       os.close(w)
       pid = os.getpid()
       for i in range(100):
           data = os.read(r, PIPE_BUF)
           if not len(data):
               break
           print(f"reader{instance}: {data}")

os.close(r)
os.close(w)

for i in range(4):
   os.waitpid(-1,0)

Run it:

$ python3 ./packets.py
supervisor: 1200
writer0: 1201
reader0: 1203
reader0: b'writer0: 0'
writer1: 1202
reader0: b'writer1: 0'
reader1: 1204
reader0: b'writer1: 1'
reader0: b'writer0: 1'
reader0: b'writer0: 2'
reader0: b'writer1: 2'
reader0: b'writer0: 3'
reader1: b'writer1: 3'
reader0: b'writer0: 4'
reader1: b'writer1: 4'

If we remove the O_DIRECT flag and rerun it, we can see how readers start to break the boundaries of messages and, from time to time, get 2 messages instead of 1. The situation could be even worse, and the boundaries could be violated if a reader reads a buffer less than a writer’s written.

reader0: b'writer0: 2writer1: 2'
reader1: b'writer0: 3writer1: 3'
reader1: b'writer1: 4writer0: 4'
reader0: b'writer0: 5'
reader1: b'writer1: 5'
reader0: b'writer0: 6writer1: 6'
reader1: b'writer1: 7'
reader0: b'writer0: 7'

PIPE Nonblocking I/O #

Unlike regular files, pipes natively support nonblocking I/O. You can create a new pipe or switch an existing pipe to the nonblocking I/O mode. The most important outcome of doing this is the ability to poll a pipe using poll(), select() and epoll() event notification facilities. Nonblocking mode saves the CPU (if correctly written) and provides a unified API for programs and developers.

Nonblocking mode might be also useful to write user space busy loops in order to get better throughput by trading more CPU usage. The idea is to skip some kernel wake up logic and return from kernel mode as soon as possible.

The following example shows that even with python, where exceptions are slow, we can get a better throughput with a busy loop:

No busy loop code:

import os

rand = os.getrandom(1<<16-1)

while True:
   os.write(1, rand)

Max throughput: in my virtual machine:

$ python3 ./no_busy_loop.py  | pv | strings > /dev/null
 631MiB 0:00:10 [74.5MiB/s] 

With busy loop:

import os
import fcntl

flags = fcntl.fcntl(1, fcntl.F_GETFL, 0)
fcntl.fcntl(1, fcntl.F_SETFL, flags | os.O_NONBLOCK)

rand = os.getrandom(1<<16-1)

while True:
   try:   
       n = os.write(1, rand)
   except BlockingIOError:
       continue

I was able to get 10% better throughput in my test vm:

$ python3 ./busy_loop.py  | pv | strings > /dev/null
 799MiB 0:00:11 [82.7MiB/s] 

Partial writes and syscall restarts #

Now we are ready to delve into the kernel internals a bit deeper. Let’s assume we want to write 512 MiB of some data to a pipe. We already have it all in memory and call the write() syscall:

data = os.getrandom(1<<29) # 512 MiB
os.write(1, data) # where stdout is a pipe in a shell pipeline

We know from the above that the size of a pipe by default is 64KiB, but the man 7 pipe says:

Applications should not rely on a particular capacity: an application should be designed so that a reading process consumes data as soon as it is available, so that a writing process does not remain blocked.

It means that, in default blocking I/O mode, our write() call should block until all bytes have not been transferred through a pipe. It makes sense, the userspace application should not, in theory, care about the underlying kernel machinery. We have a userspace buffer with data, and we should be able to write it in one blocking call. But fortunately or unfortunately, things are a bit more complicated.

One theory we also need to recall here is that the kernel puts a process into a sleep state if a syscall blocks. There are 2 sleep types: interruptible (S) sleep and uninterruptible (D) sleep.

For example, the above code snippet with the write() syscall puts a process into the interruptible state (because it writes to a pipe):

$ ps axu | grep write2.py
vagrant    S+   20:36   0:15 python3 ./write2.py

     where S informs us that the process is in the interruptible sleep (waiting for an event to complete).

Such processes are removed from the kernel scheduler list and are put in a dedicated queue waiting for a particular event.

The interruptible sleep state differs from the uninterruptible in that the kernel can deliver signals to the process. Rephrasing, it’s possible to receive and handle signals during the blocking syscall in the interruptible sleep state. But the reasonable question is, what happens after the signal is handled in the middle of such a syscall? Let’s figure it out.

We start with the pipe_write() kernel function, where we can find the following code:

if (signal_pending(current)) {
   if (!ret)
       ret = -ERESTARTSYS;
   break;
}

The above confirms that the signal could interrupt the process during the blocking pipe write() syscall. The interesting part here is the ret variable. If it doesn’t have anything, the kernel sets it to the -ERESTARTSYS error. Otherwise, the kernel leaves it as is. In both cases, the kernel exits from the infinitive for loop. This infinitive for loop is what keeps the write() syscall in the blocking state, and it’s in charge of data transferring between usersapace buffer (512 MiB in our case) and kernel space pipe ring buffer.

In turn, the ret variable stores the number of transferred through the pipe bytes. It can be much bigger than the 64KiB default pipe size because there is always at least one consumer that reads from this pipe.

One more piece of information that will help us understand the following examples and behavior is the ERESTARTSYS error. It signals that the kernel can safely restart a syscall because it hasn’t done any meaningful work and has no side effects.

With all the above said, we are ready to do some coding and debugging in order to answer the question of whether it is sufficient to do one write() to a pipe.

In our tests we’ll use bpftrace. It’s a handy tool that allows us to trace kernel functions via eBPF trampolines, which allows kernel code to call into BPF programs with practically zero overhead.

We’ll be tracing a pipe_write() kernel function to get insides about the actual pipe writes.

Let’s start with a producer of data. Here we have a signal handler for the SIGUSR1 signal, which prints the signal code, a buffer with random 512 MiB, and one write() syscall to stdout fd.

import signal
import sys
import os

def signal_handler(signum, frame):
   print(f"signal {signum} {frame}" ,file=sys.stderr)

signal.signal(signal.SIGUSR1, signal_handler)

print(os.getpid(), file=sys.stderr)

rand = os.getrandom(1<<29)
print("generated", file=sys.stderr)

n = os.write(1, rand)

print(f"written: {n}", file=sys.stderr)

Now we need to write a consumer for a pipe. It will sleep for 30 seconds and afterward reads all data from the stdin in 4KiB chunks.

import time
import sys


print("start sleeping", file=sys.stderr)
time.sleep(30)
print("stop sleeping", file=sys.stderr)

r = sys.stdin.buffer.read(4096)
while len(r) > 0:
   r = sys.stdin.buffer.read(4096) 

print("finished reading", file=sys.stderr)

We are ready for experiments. Let’s launch bpftrace first. We’re looking for python3 commands and want to print the return value of the pipe_write() kernel function. Please, run the following in a terminal window.

$ sudo bpftrace -e 'kretfunc:pipe_write /comm == "python3"/  { printf("%d\n", retval);}'
Attaching 1 probe...

In another terminal window, we need to start our shell pipeline under strace tool for the writer. strace logs all write() syscalls into log.txt.

$ strace --output log.txt -s0 -e write -- python3 ./write2.py | python3 ./slow_reader.py
start sleeping
1791
generated

We are in a situation where the buffer is full, the writer is in the interruptible sleep state (S), and the reader is still sleeping. It’s time to open one more console and send the SIGUSR1 signal to the blocked writer:

$ kill -USR1 1791

In the console with the pipeline, you should eventually see something like the following:

$ strace --output log.txt -s0 -e write -- python3 ./write2.py | python3 ./slow_reader.py
start sleeping
1791
generated
signal 10 <frame at 0x7f59b135da40, file '/home/vagrant/data/blog/post2/./write2.py', line 15, code <module>>
written: 65536
stop sleeping
finished reading

The writer received the signal and exited. It also printed that it had successfully transferred only 65536 bytes (doesn’t look familiar?).

The console with the bpftrace confirms the above. The pipe_write() syscall managed to write only 64KiB of data.

$ sudo bpftrace -e 'kretfunc:pipe_write /comm == "python3"/  { printf("%d\n", retval);}'
Attaching 1 probe...
65536

The strace log shows the same:

$ cat log.txt 

write(1, ""..., 536870912)           	= 65536
--- SIGUSR1 {si_signo=SIGUSR1, si_code=SI_USER, si_pid=806, si_uid=1000} ---

It looks like it is not sufficient to have only one syscall. If we now open the write() syscall documentation (man 2 write):

Note that a successful write() may transfer fewer than count bytes. Such partial writes can occur for various reasons; for example, because there was insufficient space on the disk device to write all of the requested bytes, or because a blocked write() to a socket, pipe, or similar was interrupted by a signal handler after it had transferred some, but before it had transferred all of the requested bytes. In the event of a partial write, the caller can make another write() call to transfer the remaining bytes. The subsequent call will either transfer further bytes or may result in an error (e.g., if the disk is now full).
The documentation answers our initial question, but there is something, even more, to show here.

As we saw, the pipe_write() function can also return the ERESTARTSYS error if no bytes are written. It is an interesting case, and the kernel can be asked to restart such syscalls automatically without any userspace retries. It makes total sense; the syscall didn’t have any chances to do its work, so the state is the same. The configuration of the kernel restart is done by setting the SA_RESTART flag. By default, it is already enabled in python. You can check it with the strace:

rt_sigaction(SIGUSR1, {sa_handler=0x45c680, sa_mask=~[], sa_flags=SA_RESTORER|SA_ONSTACK|SA_RESTART|SA_SIGINFO, sa_restorer=0x45c7c0}, NULL, 8) = 0

Now we are finished with all the theory and experiments. But still have the task unresolved. What is the recommended way to write such big buffers into a pipe? We can find the answer in the python source code and its buffer writer implementation:

def _flush_unlocked(self):
   
   while self._write_buf:
      
      n = self.raw.write(self._write_buf)
      
      del self._write_buf[:n]

The above snippet shows how python restarts the write() syscall in case of a partial write.

Now let’s rewrite our producer to use a buffered writer and demonstrate two restart concepts:

  • the automatic syscall restart;
  • the restart after a partial write.
import signal
import sys
import os

def signal_handler(signum, frame):
   print(f"signal {signum} {frame}" ,file=sys.stderr)

signal.signal(signal.SIGUSR1, signal_handler)

print(os.getpid(), file=sys.stderr)

rand = os.getrandom(1<<29)
print("generated", file=sys.stderr)

n = sys.stdout.buffer.write(rand)  # <------------------- changed
print(f"written: {n}", file=sys.stderr)

Start a pipeline:

$ strace --output log.txt -s0 -e write -- python3 ./write2.py | python3 ./slow_reader.py
start sleeping
19058
generated

This time let’s send 4 signals:

$ kill -USR1 19058
$ kill -USR1 19058
$ kill -USR1 19058
$ kill -USR1 19058

The output has changed. Now the writer was able to write all the data.

$ strace --output log.txt -s0 -e write -- python3 ./write2.py | python3 ./slow_reader.py
start sleeping
19058
generated
signal 10 <frame at 0x7f21a4705a40, file './write2.py', line 15, code <module>>
signal 10 <frame at 0x7f21a4705a40, file './write2.py', line 15, code <module>>
signal 10 <frame at 0x7f21a4705a40, file './write2.py', line 15, code <module>>
signal 10 <frame at 0x7f21a4705a40, file './write2.py', line 15, code <module>>
stop sleeping
written: 536870912
finished reading

The bpftrace logs show what we expected. For the first write we see the default pipe buffer size, next we see three ERESTARTSYS errors which reflect our 4 signals, and a final big write with all remaining data.

$ sudo bpftrace -e 'kretfunc:pipe_write /comm == "python3"/  { printf("%d\n", retval);}'
65536
-512
-512
-512
536805376

In strace log we can also see the information on syscall restarts, and it confirms what we saw in bpftrace log.

$ cat log.txt 

write(1, ""..., 536870912)           	= 65536
--- SIGUSR1 {si_signo=SIGUSR1, si_code=SI_USER, si_pid=14994, si_uid=1000} ---
write(1, ""..., 536805376)           	= ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGUSR1 {si_signo=SIGUSR1, si_code=SI_USER, si_pid=14994, si_uid=1000} ---
write(1, ""..., 536805376)           	= ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGUSR1 {si_signo=SIGUSR1, si_code=SI_USER, si_pid=14994, si_uid=1000} ---
write(1, ""..., 536805376)           	= ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGUSR1 {si_signo=SIGUSR1, si_code=SI_USER, si_pid=14994, si_uid=1000} ---
write(1, ""..., 536805376)           	= 536805376

Also, if we sum the returns of the write() syscalls, we’ll get the initial random bytes buffer:

536805376 + 65536 = 536870912

The first write() restart was done by the python buffered writer due to a partial write of 64KiB, and all other 3 were restarted by the kernel due to the ERESTARTSYS error and the SA_RESTART flag.

Pipe performance: splice(), vmsplice() and tee() #

Generally, it uses double buffering when a program makes reads()/writes() calls to a regular file, a socket or a pipe. One buffer is allocated in the user space and then copied to the kernel. Such a situation leads to the loss of performance and undesirable memory allocations. Another potential performance penalty for high-performance tools is the number and duration of system calls for one unit of program iteration. For instance, if we want to replace every 2nd line of a file, we need to read the file in some chunks (1 read() syscall), make changes, and write the changed buffer back (1 write() syscall). These sequences of operations should be while we don’t reach the EOF.

But luckily, there are 3 syscalls that can significantly improve your code, especially if you are going to use stdin or stdout:

  • splice() – moves data from the buffer to an arbitrary file descriptor, or vice versa, or from one buffer to another (man 2 splice)
  • vmsplice() – “copies” data from user space into the buffer (man 2 vmsplice)
  • tee() - allocates internal kernel buffer (man 2 tee)

The main idea here is what if we use a pipe not as a channel between 2 or more processes but just as an in-kernel ring buffer? Yes, of course, if you need to work with stdin and stdout, you can get a win-win situation because you don’t have to create an artificial pipe and use your real one.

So, for example, golang uses pipes (or pool of them) in some zero-copy operations between sockets when the io.Copy()/io.CopyN()/io.CopyBuffer()/io.ReaderFrom() are called.

So, the usage:

r,w = pipe() # allocate a kernel ring buffer of 64KB size.

for ;; {
	n = splice(socket1, w)
	if n < 0 {
    	    break // error, need to check it more carefully here
	}
    
	m = splice(r, socket2)
	if m < 0 {
    	    break // error, need to check it more carefully here
	}

	if m < n { 
           // partial write
           // need to resend the rest of the buffer
	}
}

The above code is, of course, a pseudo code and doesn’t cover interruption errors and partial writes. But the main idea should be clear.

In theory, we can also increase the pipe buffer size, but it depends on the system and the CPU cache size. But in some cases, the bigger buffer might lead to performance degradations. So do your performance tests carefully.

You can also use this approach to copy a file to another place. But there is even a better solution – the copy_file_range syscall (man 2 copy_file_range). This one syscall does all the work for copying a file. As I mentioned earlier, fewer syscalls lead to better performance.

vmsplice() is another beast that could be useful when you want to make changes in the user space and then move this memory to the kernel pipe buffer without copying it. The example:

r,w = pipe() // allocate a kernel ring buffer of 64KB size.

n = read(file, buf) // read data from the file into the user space buffer

modify_data(buf) // apply some business logic to the data chunk

m = vmsplice(w, buf) // transfer buffer to the kernel buffer

The above code is a simplified version of what can be done. But unfortunately, in reality, dealing with vmsplice is complex, and bad documentation doesn’t help at all. If you want to go this way, please read the kernel source code first in order to understand all possible problems with vmsplice and zero data coping.

The last syscall that we have is tee(). It’s usually used with splice(). You probably know about the tee cli tool. (man 1 tee). The purpose of the util is to copy data from one pipe to another one while duplicating the data to a file. The coreutils implementation of tee uses read() and write() system calls to work with the pipes. But we are going to write our own version with 2 pretty new syscalls: tee() and splice() instead.

The tee()system call “copies” data from one buffer (pipe) to another. In reality, no real coping happens. Under the hood, the kernel just changes buffer references for pipe memory. Thus, tee() syscall does not consume any data, so the subsequent splice() call can get this data from a pipe.

So our homebrew implementation could be the following:

package main

import (
   "fmt"
   "os"
   "syscall"
)

const (
   SPLICE_F_MOVE     = 1
   SPLICE_F_NONBLOCK = 2
)

func main() {

   file_path := os.Args[1]

   fmt.Println("pid:", os.Getpid())

   file, err := os.OpenFile(file_path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
   if err != nil {
       panic(err)
   }

   for {
       n, err := syscall.Tee(0, 1, 1<<32-1, SPLICE_F_NONBLOCK)
       if err == syscall.EAGAIN {
           continue
       }

       if err != nil {
           panic(err)
       }

       if n == 0 {
           break
       }

       for n > 0 {
           slen, err := syscall.Splice(0, nil, int(file.Fd()), nil, int(n), SPLICE_F_MOVE)
           if err != nil {
               panic(err)
           }
           n -= slen
       }
   }
}

Let’s test it:

$ cat /var/tmp/file1.db |./tee /tmp/tee.log | strings | grep -E "^ss1"
ss1T
ss1vg
ss1j1;
ss1*

And verify that the file are identical with md5sum:

$ md5sum /var/tmp/file1.db
737f4f46feed57b4c6bdde840945948e  /var/tmp/file1.db
$ md5sum /tmp/tee.log
737f4f46feed57b4c6bdde840945948e  /tmp/tee.log

And a final note, I would suggest reading the archive of Linus’s emails about splice(), tee() and vmsplice() here https://yarchive.net/comp/linux/splice.html. You can find there a lot of design questions and solution for all these syscalls.

Read next chapter →