Pipes #
The pipe is a neat feature of the Linux kernel that allows us to build one-directional communication channels between related processes (often a parent and a child).
Pipes are usually well known from shells, where we use “|
” symbol to build command pipelines. But first of all, the pipe is a system call, or actually, there are 2 of them: pipe()
and pipe2()
(man 2 pipe
).
You can think of a pipe as a memory buffer with a byte stream API. Thus, by default, there are no messages or strict boundaries. The situation has changed since the Linux kernel 3.4 where the O_DIRECT
flag and the packet mode were introduced. We will touch all variant of working with pipes in this chapter.
Another important feature of pipes is the max size of an atomic write. The PIPE_BUF
constant (man 7 pipe
) determines it and sets it to 4096 bytes. Please, read the man carefully if you want to rely on this guarantee.
As a result of a streaming nature, a reader and a writer can use completely different user-space buffer sizes if they want. All written bytes are read sequentially, so making the lseek()
syscall for a pipe is impossible.
The pipes also provide a convenient notification API for both ends. The write calls to a pipe block if the internal kernel buffer is full. The writer will block or return EAGAIN
(if it’s in nonblocking mode) until sufficient data has been read from the pipe to allow the writer to complete. On the other hand, if all pipe readers close their read file descriptors, the writer will get the SIGPIPE
signal from the kernel, and all subsequent write()
calls will return the EPIPE
error.
From a reader’s perspective, a pipe can return a zero size read (end-of-file, EOF
) if all writers close all their write pipe file descriptors. A reader blocks if there is nothing to read until data is available (you can change this by opening a pipe in the nonblocking mode).
Pipes are widely used in shells. The elegance of such an approach is that processes don’t have to know that they use pipes. They continue working with their standard file descriptors (stdin
, stdout
and stderr
) as usual. Developers also don’t need to make any changes in their program’s source code in order to support this concept. It makes the process of connecting 2 programs composite, flexible, fast and reliable. Of course, in order to support such communication, shells have to do some additional work before spawning new commands (more details and examples below).
Internally, a pipe buffer is a ring buffer with slots. Each slot has a size of a PIPE_BUF
constant. The number of slots is variable, and the default number is 16. So, if we multiply 16 by 4KiB, we can get a default size of 64KiB for a pipe buffer.
We can control the capacity of a pipe by calling the fcntl()
with the F_SETPIPE_SZ
flag. A pipe’s system max size limit can be found in the /proc/sys/fs/pipe-max-size
(man 7 pipe
).
We can get the size of unread bytes in a pipe by calling ioctl()
with FIONREAD
operation. We’ll write an example later.
The usual question about pipes is, do we really need them? Can we use regular files instead? There are several issues and lacking of API with using files instead of pipes:
- There is no easy way to notify a writer that a reader has stopped reading.
- For a reader, we can set up
inotify
(man 7 inotify
) to efficiently track whether new changes appear. - Also, regular files don’t have nonblocking API (this is changing with
io_uring
, but still, it’s much harder to use it in comparison with thepipe()
syscall).
One final introduction remark is that a pipe can be used by more than 2 processes. It’s possible to have multiple writers and readers for a single pipe. It’s not usuall because of the streaming nature of pipes and no clear boundaries by default, but with the new packet mode, it’s become more useful in some situations.
How shells internally create pipes #
With shells we usually use pipes to connect the stdout
and/or the stderr
of a process and stdin
of another process. For example:
stdout
to stdin
:
$ command1 | command2
stdout
and stderr
to stdin
:
$ command1 |& command2
or
command1 2>&1 | command2
So let’s understand how shells connect the following 2 commands internally.
$ ls -la | wc -l
As we already know, a shell process has three special standard open file descriptors. Thus, all its children inherit them by default because of the fork()
syscalls. The following simple program shows how a shell can create a pipe and connect 2 programs. It creates a pipe in the parent process, then makes a fork()
call twice in order to run execve()
for the ls
and wc
binaries. Before the execve()
calls, the children duplicate the needed standart fd with one of the ends of the pipe.
import sys
import os
r, w = os.pipe()
ls_pid = os.fork()
if not ls_pid:
# child
os.close(r)
os.dup2(w, sys.stdout.fileno())
os.close(w)
os.execve("/bin/ls", ["/bin/ls", "-la", ], os.environ)
wc_pid = os.fork()
if not wc_pid:
# child
os.close(w)
os.dup2(r, sys.stdin.fileno())
os.close(r)
os.execve("/usr/bin/wc", ["/usr/bin/wc", "-l"], os.environ)
os.close(r)
os.close(w)
for i in range(2) :
pid, status = os.waitpid(-1, 0)
And if we run it:
$ python3 ./simple_pipe.py
12
The one important note about the above code is how I close all not needed file descriptors of the pipe. We have to close them to allow the kernel to send us correct signals, block operations, and return EOF
when there are no more writers.
Pipe and write buffer #
Modern programming languages (for example, python
) often buffer all their writes in memory before the actual write syscall executes. The main idea of such buffering is to get better I/O
performance. It’s cheaper to make one big write()
call than several smaller ones. There are 2 types of buffers that are widely used:
- Block buffer
For example, if its size is 4KiB, the buffer will write its content (flush) to the underlying fd only when it fills up completely or the explicitflush()
call is invoked. - Line buffer
This buffer type flushes its content when the new line character write occurs to the buffer.
The python
(and other programming languages) changes the buffer type depending on the type of the underlying file descriptor. If the fd is a terminal, the buffer will be a line buffer. That makes sense because when we are in the interactive shell, we want to get the output as soon as possible. However, a block buffer will be used for pipes and regular files because it’s usually OK to postpone the flush for better performance.
The libc
function isatty()
(man 3 isatty) tests whether a file descriptor refers to a terminal.
Let’s demonstrate this behavior with 2 scripts connected by a pipe. The first one will print 10 lines to stdout
, and the other one will consumethese lines from its stdin
.
The printing script print.py
:
import time
for i in range(4):
print(f"{i}")
time.sleep(0.1)
And the consumer script: stdin.py
import fileinput
for i, line in enumerate(fileinput.input()):
print(f"{i+1}: {line.rstrip()}")
If you run the print.py
, you should see how the output will be printed in a line-by-line manner:
$ python3 ./print.py
0
1
2
3
Now, if we run these 2 scripts with a pipe, you should see that the output freezes for a second, and it prints all lines at once afterwards:
$ python3 ./print.py | python stdin.py
1: 0
2: 1
3: 2
4: 3
Now let’s make it smoother. We need to add a flush()
call after each print()
in the print.py
:
import time
import sys
for i in range(4):
print(f"{i}")
sys.stdout.flush()
time.sleep(0.1)
And rerun it. Now, you should be able to see that the lines appear smoothly one-by-one:
$ python3 ./print.py | python stdin.py
1: 0
2: 1
3: 2
4: 3
It’s worth knowing that some core utilities have an option to control their buffering. For example, the grep
can be forced to use a per line buffer with the --line-buffered
option. Of course, this will give you a more interactive experience with some performance penalties. You can play with it and compare the outputs:
$ strings /var/tmp/file1.db | grep --line-buffered -E "^sek" | cat
sek.^ \
sekA
sekt
$ strings /var/tmp/file1.db | grep -E "^sek" | cat
sek.^ \
sekA
sekt
SIGPIPE
signal
#
One of the exciting aspects of the pipes is their notification and synchronization features.
We intentionally closed all unused fd in the above code with fork()
and execve()
calls. The reason for doing that was not only our desire to save file descriptors and write a cleaner code but also to support the pipe notification features.
If all readers close their fd of the pipe and a writer tries to send data into it, the writer process will get the SIGPIPE
signal from the kernel. This is a brilliant idea. Let’s assume we want to grep
a huge nginx
access log (for example, 500GiB) in order to find a target string and care only about the first three results:
$ cat /var/log/nginx/access.log | grep some_string | head -3
So, if we assume that the log file has all three target lines somewhere at the beginning of the file, the head
command will exit almost immediately. Thus we don’t need to continue reading the file. As so, when the head
util exits, it closes all its fd, including the stdin
(which is a pipe). The subsequent writes from the grep
will cause the kernel to send the SIGPIPE
signal to it. The default handler for the SIGPIPE
signal is to terminate, so grep
will exit and close all its fd, including its stdin
. And in its turn, the cat
command will exit after receiving its own SIGPIPE
signal. So the exit of the head
starts the cascading exit of the whole shell pipeline.
A shell are usually waiting on the processes with the waitpid()
syscall and collects all return codes. When it sees that all the process pipeline has finished, the shell sets the exit status variable $?
to the returned code of the last command in the pipeline (head
in our case) and populates the $PIPESTATUS
(bash
) or $pipestatus
(zsh
) array variable with all return codes of the piplene.
Let me demonstrate it. As you can see, all the above works without any support in the cat
, grep
or head
tools. It’s the beauty of the pipes and shells collaboration.
Now we are ready to write our own prove of the above:
import signal
import os
import sys
def signal_handler(signum, frame):
print(f"[pipe] signal number: {signum}", file=sys.stderr)
os._exit(signum)
signal.signal(signal.SIGPIPE, signal_handler)
for i in range(9999):
print(f"{i}")
And run it:
$ python3 ./print.py | head -3
0
1
2
[pipe] signal number: 13 <--------------
$ echo ${PIPESTATUS[@]}
13 0
$pipestatus
, $?
and pipefail
#
We are ready to take a bit closer look at the exit statuses of a bash
pipeline. By default, the last command in the pipe is used for the $?
variable, which could sometimes lead to unexpected results. For instance:
$ echo 'some text' | grep no_such_text | cut -f 1
$ echo $?
0
$ echo 'some text' | grep no_such_text | cut -f 1
$ echo ${PIPESTATUS[@]}
0 1 0
But fortunately, we can change this behavior with a pipefail
bash
option:
$ set -o pipefail
$ echo 'some text' | grep no_such_text | cut -f 1
$ echo $?
1
FIFO or Named pipes #
So far, we have been talking about pipes in the context of related processes (a parent and its children), but we also have the option to share a pipe easily among any number of unrelated processes. We can create a disk reference for a pipe which is called a named pipe or a FIFO file.
There is one high-level man 7 fifo
and a tool to create a fifo file mkfifo (man 1 mkfifo)
).
The permission control is based on regular file permissions. So, if a process has “write” permissions, it can write to this named pipe.
All other aspects are identical to a regular pipe. The kernel internally creates the same pipe object and doesn’t store any data on disk.
The FIFO file could be helpful when you need to build a connection between completely unrelated programs or daemons without changing their source code.
pv
tool
#
pv
or pipe viewer (man 1 pv
) is a nifty tool to work with pipes and file descriptors. We can insert it in any pipeline place, and it will show additional info such as ETA, write rate and amount of transferred data with an incredible visual progress bar.
Here is the basic usage with a file shows us how fast the reading strings
command can consume a file:
$ pv /var/tmp/file1.db | strings > /dev/null
100MiB 0:00:01 [67.9MiB/s] [===========================================>] 100%
It also can rate limit a pipe, which is really useful for tests:
$ cat /var/tmp/file1.db | pv --rate-limit=1K | strings
Another neat feature is monitoring a process’s progress for every open file descriptor. Under the hood it uses procfs
and fdinfo
folders to get the positions for all opened files:
$ pv -d 6864
3:/var/tmp/file1.db: 234 B 0:00:01 [0.00 B/s] [> ] 0% ETA 0:00:00
123:/var/tmp/file1.db: 234 B 0:00:01 [0.00 B/s] [> ] 0% ETA 0:00:00
Pipe usage #
We can get the size of unread bytes in a pipe by calling ioctl()
with FIONREAD
and a pipe fd. But how to get pipe usage from an unrelated process that doesn’t have the pipe file descriptor, for instance from a monitoring tool. Or, for example, we started a long running pipeline and not sure if the consumer of the pipe is reading data:
$ dd if=/dev/urandom | strings > /dev/null
We can, of course, use strace
and check the read()
syscalls in its output, but the reader could a read()
syscall with a huge buffer that we can miss in the strace
output.
So, in order to achieve the goal, we need to get the pipe file descriptor somehow. The most elegant solution (but not without drawbacks) is to steal the fd with sys_pidfd_getfd()
system call and then use ioctl
to get usage information.
The code can be something like the following:
package main
import (
"fmt"
"os"
"strconv"
"syscall"
"golang.org/x/sys/unix"
)
const (
sys_pidfd_open = 434
sys_pidfd_getfd = 438
FIONREAD = 0x541B
)
func pidfd_open(pid int) (int, error) {
r1, _, err := syscall.Syscall(sys_pidfd_open, uintptr(pid), 0, 0)
if err != 0 {
return -1, err
}
return int(r1), nil
}
func pidfd_getfd(pidfd, targetfd int) (int, error) {
r1, _, err := syscall.Syscall(sys_pidfd_getfd, uintptr(pidfd), uintptr(targetfd), 0)
if err != 0 {
return -1, err
}
return int(r1), nil
}
func main() {
var (
pid, fd int
err error
)
pid, err = strconv.Atoi(os.Args[1])
fd, err = strconv.Atoi(os.Args[2])
if err != nil {
panic(err)
}
pidfd, err := pidfd_open(pid)
if err != nil {
panic(err)
}
newFd, err := pidfd_getfd(pidfd, fd)
if err != nil {
panic(err)
}
for {
size, err := unix.IoctlGetInt(newFd, FIONREAD)
if err != nil {
panic(err)
}
fmt.Printf("size:\t%d\n", size)
}
}
Run our target pipeline:
$ dd if=/dev/urandom | pv --rate-limit 30K | strings > /dev/null
^ KiB 0:00:16 [27.6KiB/s]
And run our tool:
$ sudo go run ./pipe_capacity.go 19990 1
size: 62464
size: 62464
size: 63488
size: 63488
size: 63488
size: 63488
size: 63488
The main drawback of such a technique is that we are holding the write end of the pipe open. This can lead to an extended life of the reader because it will block on an empty pipe instead of getting EOF
(see the pipe notification feature explained above).
Packets pipe mode (O_DIRECT
)
#
Since the Linux kernel 3.4, a pipe can be created with the O_DIRECT
flag. It puts it into packet mode. From my point of view, this mode can be successfully used only with writes and reads that are less or equal to the PIPE_BUF
size (4KiB) because atomicity is guaranteed only in this case.
The packet mode is different from the default stream mode in the following ways:
- The kernel doesn’t try to merge writes into one ring buffer slot here and here. It, of course, leads to the underutilization of the pipe buffer, but provides guarantee of boundaries instead.
- Readers with
read()
ofPIPE_BUF
size get the same messages as the writers wrote; - If the reader’s buffer is less than the data in the slot (some misconfigured reader infiltrated), then the remaining data is discarded to protect the boundaries of messages.
Now let’s write an example with 2 writers and 2 readers. Every writer writes less than 4KiB, so readers will get one full message on every read:
import sys
import os
import time
PIPE_BUF = 4096
print(f"supervisor: {os.getpid()}", file=sys.stderr)
r, w = os.pipe2(os.O_DIRECT)
# fork 2 writers
for instance in range(2):
writer_pid = os.fork()
if not writer_pid:
print(f"writer{instance}: {os.getpid()}", file=sys.stderr)
os.close(r)
pid = os.getpid()
for i in range(100):
os.write(w, f"writer{instance}: {i}".encode())
time.sleep(1)
# fork 2 readers
for instance in range(2):
reader_pid = os.fork()
if not reader_pid:
print(f"reader{instance}: {os.getpid()}", file=sys.stderr)
os.close(w)
pid = os.getpid()
for i in range(100):
data = os.read(r, PIPE_BUF)
if not len(data):
break
print(f"reader{instance}: {data}")
os.close(r)
os.close(w)
for i in range(4):
os.waitpid(-1,0)
Run it:
$ python3 ./packets.py
supervisor: 1200
writer0: 1201
reader0: 1203
reader0: b'writer0: 0'
writer1: 1202
reader0: b'writer1: 0'
reader1: 1204
reader0: b'writer1: 1'
reader0: b'writer0: 1'
reader0: b'writer0: 2'
reader0: b'writer1: 2'
reader0: b'writer0: 3'
reader1: b'writer1: 3'
reader0: b'writer0: 4'
reader1: b'writer1: 4'
If we remove the O_DIRECT
flag and rerun it, we can see how readers start to break the boundaries of messages and, from time to time, get 2 messages instead of 1. The situation could be even worse, and the boundaries could be violated if a reader reads a buffer less than a writer’s written.
…
reader0: b'writer0: 2writer1: 2'
reader1: b'writer0: 3writer1: 3'
reader1: b'writer1: 4writer0: 4'
reader0: b'writer0: 5'
reader1: b'writer1: 5'
reader0: b'writer0: 6writer1: 6'
reader1: b'writer1: 7'
reader0: b'writer0: 7'
PIPE Nonblocking I/O #
Unlike regular files, pipes natively support nonblocking I/O. You can create a new pipe or switch an existing pipe to the nonblocking I/O mode. The most important outcome of doing this is the ability to poll a pipe using poll()
, select()
and epoll()
event notification facilities. Nonblocking mode saves the CPU (if correctly written) and provides a unified API for programs and developers.
Nonblocking mode might be also useful to write user space busy loops in order to get better throughput by trading more CPU usage. The idea is to skip some kernel wake up logic and return from kernel mode as soon as possible.
The following example shows that even with python
, where exceptions are slow, we can get a better throughput with a busy loop:
No busy loop code:
import os
rand = os.getrandom(1<<16-1)
while True:
os.write(1, rand)
Max throughput: in my virtual machine:
$ python3 ./no_busy_loop.py | pv | strings > /dev/null
631MiB 0:00:10 [74.5MiB/s]
With busy loop:
import os
import fcntl
flags = fcntl.fcntl(1, fcntl.F_GETFL, 0)
fcntl.fcntl(1, fcntl.F_SETFL, flags | os.O_NONBLOCK)
rand = os.getrandom(1<<16-1)
while True:
try:
n = os.write(1, rand)
except BlockingIOError:
continue
I was able to get 10% better throughput in my test vm:
$ python3 ./busy_loop.py | pv | strings > /dev/null
799MiB 0:00:11 [82.7MiB/s]
Partial writes and syscall restarts #
Now we are ready to delve into the kernel internals a bit deeper. Let’s assume we want to write 512 MiB of some data to a pipe. We already have it all in memory and call the write()
syscall:
data = os.getrandom(1<<29) # 512 MiB
os.write(1, data) # where stdout is a pipe in a shell pipeline
We know from the above that the size of a pipe by default is 64KiB, but the man 7 pipe
says:
Applications should not rely on a particular capacity: an application should be designed so that a reading process consumes data as soon as it is available, so that a writing process does not remain blocked.
It means that, in default blocking I/O mode, our write()
call should block until all bytes have not been transferred through a pipe. It makes sense, the userspace application should not, in theory, care about the underlying kernel machinery. We have a userspace buffer with data, and we should be able to write it in one blocking call. But fortunately or unfortunately, things are a bit more complicated.
One theory we also need to recall here is that the kernel puts a process into a sleep state if a syscall blocks. There are 2 sleep types: interruptible (S
) sleep and uninterruptible (D
) sleep.
For example, the above code snippet with the write()
syscall puts a process into the interruptible state (because it writes to a pipe):
$ ps axu | grep write2.py
vagrant S+ 20:36 0:15 python3 ./write2.py
where S
informs us that the process is in the interruptible sleep (waiting for an event to complete).
Such processes are removed from the kernel scheduler list and are put in a dedicated queue waiting for a particular event.
The interruptible sleep state differs from the uninterruptible in that the kernel can deliver signals to the process. Rephrasing, it’s possible to receive and handle signals during the blocking syscall in the interruptible sleep state. But the reasonable question is, what happens after the signal is handled in the middle of such a syscall? Let’s figure it out.
We start with the pipe_write()
kernel function, where we can find the following code:
if (signal_pending(current)) {
if (!ret)
ret = -ERESTARTSYS;
break;
}
The above confirms that the signal could interrupt the process during the blocking pipe write()
syscall. The interesting part here is the ret
variable. If it doesn’t have anything, the kernel sets it to the -ERESTARTSYS
error. Otherwise, the kernel leaves it as is. In both cases, the kernel exits from the infinitive for
loop. This infinitive for
loop is what keeps the write()
syscall in the blocking state, and it’s in charge of data transferring between usersapace buffer (512 MiB in our case) and kernel space pipe ring buffer.
In turn, the ret
variable stores the number of transferred through the pipe bytes. It can be much bigger than the 64KiB default pipe size because there is always at least one consumer that reads from this pipe.
One more piece of information that will help us understand the following examples and behavior is the ERESTARTSYS
error. It signals that the kernel can safely restart a syscall because it hasn’t done any meaningful work and has no side effects.
With all the above said, we are ready to do some coding and debugging in order to answer the question of whether it is sufficient to do one write()
to a pipe.
In our tests we’ll usebpftrace
. It’s a handy tool that allows us to trace kernel functions viaeBPF
trampolines, which allows kernel code to call intoBPF
programs with practically zero overhead.
We’ll be tracing a pipe_write()
kernel function to get insides about the actual pipe writes.
Let’s start with a producer of data. Here we have a signal handler for the SIGUSR1
signal, which prints the signal code, a buffer with random 512 MiB, and one write()
syscall to stdout
fd.
import signal
import sys
import os
def signal_handler(signum, frame):
print(f"signal {signum} {frame}" ,file=sys.stderr)
signal.signal(signal.SIGUSR1, signal_handler)
print(os.getpid(), file=sys.stderr)
rand = os.getrandom(1<<29)
print("generated", file=sys.stderr)
n = os.write(1, rand)
print(f"written: {n}", file=sys.stderr)
Now we need to write a consumer for a pipe. It will sleep for 30 seconds and afterward reads all data from the stdin
in 4KiB chunks.
import time
import sys
print("start sleeping", file=sys.stderr)
time.sleep(30)
print("stop sleeping", file=sys.stderr)
r = sys.stdin.buffer.read(4096)
while len(r) > 0:
r = sys.stdin.buffer.read(4096)
print("finished reading", file=sys.stderr)
We are ready for experiments. Let’s launch bpftrace
first. We’re looking for python3
commands and want to print the return value of the pipe_write()
kernel function. Please, run the following in a terminal window.
$ sudo bpftrace -e 'kretfunc:pipe_write /comm == "python3"/ { printf("%d\n", retval);}'
Attaching 1 probe...
In another terminal window, we need to start our shell pipeline under strace
tool for the writer. strace
logs all write()
syscalls into log.txt
.
$ strace --output log.txt -s0 -e write -- python3 ./write2.py | python3 ./slow_reader.py
start sleeping
1791
generated
We are in a situation where the buffer is full, the writer is in the interruptible sleep state (S
), and the reader is still sleeping. It’s time to open one more console and send the SIGUSR1
signal to the blocked writer:
$ kill -USR1 1791
In the console with the pipeline, you should eventually see something like the following:
$ strace --output log.txt -s0 -e write -- python3 ./write2.py | python3 ./slow_reader.py
start sleeping
1791
generated
signal 10 <frame at 0x7f59b135da40, file '/home/vagrant/data/blog/post2/./write2.py', line 15, code <module>>
written: 65536
stop sleeping
finished reading
The writer received the signal and exited. It also printed that it had successfully transferred only 65536 bytes (doesn’t look familiar?).
The console with the bpftrace
confirms the above. The pipe_write()
syscall managed to write only 64KiB of data.
$ sudo bpftrace -e 'kretfunc:pipe_write /comm == "python3"/ { printf("%d\n", retval);}'
Attaching 1 probe...
65536
The strace
log shows the same:
$ cat log.txt
write(1, ""..., 536870912) = 65536
--- SIGUSR1 {si_signo=SIGUSR1, si_code=SI_USER, si_pid=806, si_uid=1000} ---
It looks like it is not sufficient to have only one syscall. If we now open the write()
syscall documentation (man 2 write
):
Note that a successful write() may transfer fewer than count bytes. Such partial writes can occur for various reasons; for example, because there was insufficient space on the disk device to write all of the requested bytes, or because a blocked write() to a socket, pipe, or similar was interrupted by a signal handler after it had transferred some, but before it had transferred all of the requested bytes. In the event of a partial write, the caller can make another write() call to transfer the remaining bytes. The subsequent call will either transfer further bytes or may result in an error (e.g., if the disk is now full).The documentation answers our initial question, but there is something, even more, to show here.
As we saw, the pipe_write()
function can also return the ERESTARTSYS
error if no bytes are written. It is an interesting case, and the kernel can be asked to restart such syscalls automatically without any userspace retries. It makes total sense; the syscall didn’t have any chances to do its work, so the state is the same. The configuration of the kernel restart is done by setting the SA_RESTART
flag. By default, it is already enabled in python
. You can check it with the strace
:
rt_sigaction(SIGUSR1, {sa_handler=0x45c680, sa_mask=~[], sa_flags=SA_RESTORER|SA_ONSTACK|SA_RESTART|SA_SIGINFO, sa_restorer=0x45c7c0}, NULL, 8) = 0
Now we are finished with all the theory and experiments. But still have the task unresolved. What is the recommended way to write such big buffers into a pipe? We can find the answer in the python
source code and its buffer writer implementation:
def _flush_unlocked(self):
…
while self._write_buf:
…
n = self.raw.write(self._write_buf)
…
del self._write_buf[:n]
The above snippet shows how python restarts the write()
syscall in case of a partial write.
Now let’s rewrite our producer to use a buffered writer and demonstrate two restart concepts:
- the automatic syscall restart;
- the restart after a partial write.
import signal
import sys
import os
def signal_handler(signum, frame):
print(f"signal {signum} {frame}" ,file=sys.stderr)
signal.signal(signal.SIGUSR1, signal_handler)
print(os.getpid(), file=sys.stderr)
rand = os.getrandom(1<<29)
print("generated", file=sys.stderr)
n = sys.stdout.buffer.write(rand) # <------------------- changed
print(f"written: {n}", file=sys.stderr)
Start a pipeline:
$ strace --output log.txt -s0 -e write -- python3 ./write2.py | python3 ./slow_reader.py
start sleeping
19058
generated
This time let’s send 4 signals:
$ kill -USR1 19058
$ kill -USR1 19058
$ kill -USR1 19058
$ kill -USR1 19058
The output has changed. Now the writer was able to write all the data.
$ strace --output log.txt -s0 -e write -- python3 ./write2.py | python3 ./slow_reader.py
start sleeping
19058
generated
signal 10 <frame at 0x7f21a4705a40, file './write2.py', line 15, code <module>>
signal 10 <frame at 0x7f21a4705a40, file './write2.py', line 15, code <module>>
signal 10 <frame at 0x7f21a4705a40, file './write2.py', line 15, code <module>>
signal 10 <frame at 0x7f21a4705a40, file './write2.py', line 15, code <module>>
stop sleeping
written: 536870912
finished reading
The bpftrace
logs show what we expected. For the first write we see the default pipe buffer size, next we see three ERESTARTSYS
errors which reflect our 4 signals, and a final big write with all remaining data.
$ sudo bpftrace -e 'kretfunc:pipe_write /comm == "python3"/ { printf("%d\n", retval);}'
65536
-512
-512
-512
536805376
In strace
log we can also see the information on syscall restarts, and it confirms what we saw in bpftrace
log.
$ cat log.txt
write(1, ""..., 536870912) = 65536
--- SIGUSR1 {si_signo=SIGUSR1, si_code=SI_USER, si_pid=14994, si_uid=1000} ---
write(1, ""..., 536805376) = ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGUSR1 {si_signo=SIGUSR1, si_code=SI_USER, si_pid=14994, si_uid=1000} ---
write(1, ""..., 536805376) = ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGUSR1 {si_signo=SIGUSR1, si_code=SI_USER, si_pid=14994, si_uid=1000} ---
write(1, ""..., 536805376) = ? ERESTARTSYS (To be restarted if SA_RESTART is set)
--- SIGUSR1 {si_signo=SIGUSR1, si_code=SI_USER, si_pid=14994, si_uid=1000} ---
write(1, ""..., 536805376) = 536805376
Also, if we sum the returns of the write()
syscalls, we’ll get the initial random bytes buffer:
536805376 + 65536 = 536870912
The first write()
restart was done by the python
buffered writer due to a partial write of 64KiB, and all other 3 were restarted by the kernel due to the ERESTARTSYS
error and the SA_RESTART
flag.
Pipe performance: splice(), vmsplice() and tee() #
Generally, it uses double buffering when a program makes reads()
/writes()
calls to a regular file, a socket or a pipe. One buffer is allocated in the user space and then copied to the kernel. Such a situation leads to the loss of performance and undesirable memory allocations. Another potential performance penalty for high-performance tools is the number and duration of system calls for one unit of program iteration. For instance, if we want to replace every 2nd line of a file, we need to read the file in some chunks (1 read()
syscall), make changes, and write the changed buffer back (1 write()
syscall). These sequences of operations should be while we don’t reach the EOF
.
But luckily, there are 3 syscalls that can significantly improve your code, especially if you are going to use stdin
or stdout
:
splice()
– moves data from the buffer to an arbitrary file descriptor, or vice versa, or from one buffer to another (man 2 splice
)vmsplice()
– “copies” data from user space into the buffer (man 2 vmsplice
)tee()
- allocates internal kernel buffer (man 2 tee
)
The main idea here is what if we use a pipe not as a channel between 2 or more processes but just as an in-kernel ring buffer? Yes, of course, if you need to work with stdin
and stdout
, you can get a win-win situation because you don’t have to create an artificial pipe and use your real one.
So, for example, golang
uses pipes (or pool of them) in some zero-copy operations between sockets when the io.Copy()
/io.CopyN()
/io.CopyBuffer()
/io.ReaderFrom()
are called.
So, the usage:
r,w = pipe() # allocate a kernel ring buffer of 64KB size.
for ;; {
n = splice(socket1, w)
if n < 0 {
break // error, need to check it more carefully here
}
m = splice(r, socket2)
if m < 0 {
break // error, need to check it more carefully here
}
if m < n {
// partial write
// need to resend the rest of the buffer
}
}
The above code is, of course, a pseudo code and doesn’t cover interruption errors and partial writes. But the main idea should be clear.
In theory, we can also increase the pipe buffer size, but it depends on the system and the CPU cache size. But in some cases, the bigger buffer might lead to performance degradations. So do your performance tests carefully.
You can also use this approach to copy a file to another place. But there is even a better solution – the copy_file_range
syscall (man 2 copy_file_range
). This one syscall does all the work for copying a file. As I mentioned earlier, fewer syscalls lead to better performance.
vmsplice()
is another beast that could be useful when you want to make changes in the user space and then move this memory to the kernel pipe buffer without copying it. The example:
r,w = pipe() // allocate a kernel ring buffer of 64KB size.
n = read(file, buf) // read data from the file into the user space buffer
modify_data(buf) // apply some business logic to the data chunk
m = vmsplice(w, buf) // transfer buffer to the kernel buffer
The above code is a simplified version of what can be done. But unfortunately, in reality, dealing with vmsplice
is complex, and bad documentation doesn’t help at all. If you want to go this way, please read the kernel source code first in order to understand all possible problems with vmsplice
and zero data coping.
The last syscall that we have is tee(). It’s usually used with splice(). You probably know about the tee cli tool. (man 1 tee
). The purpose of the util is to copy data from one pipe to another one while duplicating the data to a file. The coreutils implementation of tee
uses read()
and write()
system calls to work with the pipes. But we are going to write our own version with 2 pretty new syscalls: tee()
and splice()
instead.
The tee()
system call “copies” data from one buffer (pipe) to another. In reality, no real coping happens. Under the hood, the kernel just changes buffer references for pipe memory. Thus, tee()
syscall does not consume any data, so the subsequent splice()
call can get this data from a pipe.
So our homebrew implementation could be the following:
package main
import (
"fmt"
"os"
"syscall"
)
const (
SPLICE_F_MOVE = 1
SPLICE_F_NONBLOCK = 2
)
func main() {
file_path := os.Args[1]
fmt.Println("pid:", os.Getpid())
file, err := os.OpenFile(file_path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
panic(err)
}
for {
n, err := syscall.Tee(0, 1, 1<<32-1, SPLICE_F_NONBLOCK)
if err == syscall.EAGAIN {
continue
}
if err != nil {
panic(err)
}
if n == 0 {
break
}
for n > 0 {
slen, err := syscall.Splice(0, nil, int(file.Fd()), nil, int(n), SPLICE_F_MOVE)
if err != nil {
panic(err)
}
n -= slen
}
}
}
Let’s test it:
$ cat /var/tmp/file1.db |./tee /tmp/tee.log | strings | grep -E "^ss1"
ss1T
ss1vg
ss1j1;
ss1*
And verify that the file are identical with md5sum
:
$ md5sum /var/tmp/file1.db
737f4f46feed57b4c6bdde840945948e /var/tmp/file1.db
$ md5sum /tmp/tee.log
737f4f46feed57b4c6bdde840945948e /tmp/tee.log
And a final note, I would suggest reading the archive of Linus’s emails about splice()
, tee()
and vmsplice()
here https://yarchive.net/comp/linux/splice.html. You can find there a lot of design questions and solution for all these syscalls.