User-space pipe implementation

A couple months ago I wrote a shard that works like IO.pipe but lives entirely in user-space. That is, it does not involve syscalls at all. The shard is called pipe:

Two reasons this is noteworthy:

  • IO.pipe returns file descriptors, so all I/O between them must pass all the way through the kernel
  • POSIX systems set limits on how many file descriptors you can have open at one time (ulimit -n) and IO.pipe consumes two of them, so you could end up running out of file descriptors in your app if you use IO.pipe regularly

Yesterday I wrote a benchmark that shows a realistic approach to using pipes that I use pretty regularly: send a large stream of data into the writer end of a pipe and use the reader end of it as the HTTP::Request#body. This way, you’re not loading an unbounded quantity of data into RAM and serializing it into a string that is also in RAM before passing it to the HTTP::Request.

The results of that benchmark show an order of magnitude difference in performance at the CPU even while performing HTTP and JSON serialization. On a cheap DigitalOcean node:

Pipe::Reader: 2.65s (2.51s user, 132ms system)
IO::FileDescriptor: 20.7s (10.3s user, 10.5s system)

On my laptop (Apple M4):

Pipe::Reader: 306ms (299ms user, 7.89ms system)
IO::FileDescriptor: 4.23s (1.24s user, 2.99s system)

Notice that it’s not just faster in “user” CPU, but it’s spending almost no time on “system” CPU (syscalls inside the kernel). This is because we’re not using syscalls at all! That system time is due to the I/O involved in sending the data over HTTP and not in sending the data over the pipe.

I have a use-case scenario for it. In my PoC for LXC container management, I am currently providing shell/terminal access through Process.exec at github.com/admiracloud/admira-containers/blob/develop/src/library/admiractl.cr#L53, killing my own program process and returning the shell process directly.

def enter(name : String)  
  return Process.exec("lxc-attach", args: [name], shell: false)  
end

For CLI usage this is fine, but for the real-time API in which my orchestrator should “proxy” the terminal and not die, I will have to pipe it, something like:

def enter(name : String)
  stdin_read, stdin_write = IO.pipe
  stdout_read, stdout_write = IO.pipe
  stderr_read, stderr_write = IO.pipe

  Process.run("lxc-attach", [name],
    input: stdin_read,
    output: stdout_write,
    error: stderr_write
  ) do |process|
    # ...
  end  
end

And with your shard this would be clearly more efficient. :high_voltage:

I almost forgot: congratulations on this shard. High quality work

It is not obvious this would work as interacting with a separate process will by necessity involve using system file descriptors for interacting with external processes. You can’t just wish away the fact that interacting with other processes will require syscalls (or shared memory).

That said it seems like a useful thing. It just cannot be a full replacement for the system provided one. Similarly I’d really like to see a memfd-based variant of IO::Memory - and that would not replace IO::Memory but be a compliment for the cases when you actually want what it make possible (shared memory without involving actual files, for example).

Why not using the C API directly instead of using the command line?

Also why not using LXD or Incus?

Thanks for the important feedback @yxhuvud

Because I would need to create the C bindings, something I am not prepared to do at the moment.

The complete platform will be open source (MIT) and I will create a proper interface for it, so we will be able to replace it at a later stage.

Both LXD and Incus are excellent projects.

I am creating another option to follow a simplified and more opinionated approach. My plan is to humbly build a complete IaaS and PaaS platform that relies exclusively on LXC and nothing else.

Incus, for example, relies on OVN for more advanced network configurations, and this is something that I would like to avoid.

It does work, though. Process.new already handles mapping the process’s stdin, stdout, and stderr for you from file descriptors into any Crystal IO object you supply, so you don’t have to supply IO::FileDescriptor instances yourself. It’s really well designed.

output = IO::Memory.new

# Stores the output of `ls` in the IO::Memory instance
Process.run("ls", output: output)

puts output

I haven’t looked deeply into this but I imagine that using fd-based pipes involves a redundant pass through the kernel for all I/O (once for the pipe between the parent and child process and once for your IO.pipe pair), so I’d recommend passing IO objects that live entirely in userspace.

Same example using the pipe shard:

require "pipe"

reader, writer = Pipe.create

spawn do
  Process.run("ls", output: writer)
ensure
  # If we don't close the writer, the `IO.copy` below will never finish. This
  # is true of `IO.pipe`, as well.
  writer.close
end

IO.copy reader, STDOUT

When Process.run receives a stream that is not a file descriptor, it creates a pipe and spawns a fiber to copy data over.
So while this example works, it’s also less efficient than using a FD pipe in the first place.

I thought it was always doing this to accommodate non-fd IOs. I didn’t realize it skips this step when you pass an IO::FileDescriptor but, sure enough, it does. We might be able to optimize this to avoid the additional fiber, though. I’d have to play around with it, but it would likely involve another custom IO type.

In theory this seems like it’d be true but, in practice, my benchmarks show that the cost of spinning up the fiber is less than just the standard deviation of the cost of spinning up the new process. So even though the IO.pipe code path is optimized by Process, the theoretically “more efficient” of the two is close enough that even the theoretically “less efficient” one is faster nearly 50% of the time. And if you need to choose which pipe implementation to use based on performance, you shouldn’t be spinning up new processes in the first place. That takes orders of magnitude more CPU time.

Spinning up an additional fiber does not cost much efficiency. Continuously swapping to and from the fiber could have some minor impact, perhaps.
The significant overhead is the extra memory copies: We need to move data from the system pipe to the user-space pipe. That extra step is unnecessary if we use a system pipe in the first place.

There are definitely usages for an internal circular buffer pipe, for example to stream request/response bodies in HTTP/2.

It might shifting from IO bound to CPU bound, though, so less syscalls and kernel ↔ user land switches, but maybe less opportunities for fibers to yield?

We could directly read from the process’ output fd to the pipe’s buffer, but then I don’t see the difference with IO::FileDescriptor that does just that. If the pipe is faster despite the copy, then there’s something fishy here :face_with_raised_eyebrow:

EDIT: especially with a Mutex + 2 Channel.

BTW, isn’t a pipe essentially a `Channel(Bytes)` with some bulk actions to avoid copying (and grabbing the lock) for one byte at a time? :thinking:

It’s conceptually similar. But I reckon the implementation might be quite a bit different for performance optimization.

The main thing missing are the bulk operations, but I think those would be fairly straightforward to implement in a fairly efficient manner. At least with the fairly simple lock based channel implementation that we have. There would probably be bigger changes if we ever made them fully or partly lockfree.

I disagree on “significant” here. The memory copies take nanoseconds and starting the new process takes milliseconds. You’d have to perform a whole lot of copies to amortize the cost of the new process.

There’s no notable difference in how often they yield. Two separate fibers, one reading and one writing, will always swap back and forth the same way. The writer fiber will yield when the buffer fills up and the reader fiber will yield when the buffer is empty, so how often they yield is influenced more by buffer size than anything else, whether that buffer is in user or kernel space.

Example
require "pipe"

{
  IO.pipe,
  Pipe.create,
}.each do |reader, writer|
  previous_reschedule_count = Thread.current.scheduler.reschedule_count.get
  start = Time.instant
  spawn same_thread: true do
    500_000_000.times do |i|
      writer.puts i
    end
  ensure
    writer.close
  end

  while line = reader.gets
  end

  pp(
    class: reader.class,
    reschedule_count: Thread.current.scheduler.reschedule_count.get - previous_reschedule_count,
    total_time: start.elapsed,
  )
end

class Crystal::Scheduler
  getter reschedule_count = Atomic(UInt64).new(0)

  def reschedule
    @reschedule_count.add 1
    previous_def
  end
end

Results

{class: IO::FileDescriptor,
 reschedule_count: 149421,
 total_time: 00:02:55.173114162}
{class: Pipe::Reader, reschedule_count: 149198, total_time: 00:01:25.521689038}

About 0.1% variance in number of reschedules and

Using Pipe.create(32 * 1024) for a 32KB buffer prints this:

{class: IO::FileDescriptor,
 reschedule_count: 149420,
 total_time: 00:02:53.657006995}
{class: Pipe::Reader, reschedule_count: 298396, total_time: 00:01:24.471714038}

The IO.pipe buffer defaults to 64KB on macOS and Linux (16 pages * 4KB each), and the Pipe.create buffer uses that same default. I can set my buffer size with Pipe.create(32 * 1024), like I demonstrated in one of the runs of the example above, if I need it to yield twice as often. I don’t have this option with IO.pipe.

Do you mean IO::FileDescriptor#read doesn’t copy? Or are you talking about the IPC use case?