Chained `IO` for reads

I feel like I’ve seen an implementation somewhere of a readable IO that chains multiple other readable IO objects where, when one hits EOF, it picks up reading the next.

Am I making this up?

My use case is that I’m reading messages from a socket, but it’s text-based and the sentinel value is the start of the next message. But I’ve already read it, so it’s gone from the socket buffer. Effectively what I’m trying to do is the equivalent of prepending the message back to the socket buffer so it can be read as the beginning of the next message.

I don’t control the message format or I’d structure it in a way that’s more suited to how I’m consuming it. I’m streaming it off the socket instead of pulling the entire payload into RAM because these messages may be large, there may be a lot of them, and there may be many concurrent requests, so I’m holding at most one at a time per request in memory.

The only thing in the standard library that comes close to concatenating IOs on the fly is IO::ARGF, but what you’re describing sounds more like ungetc. Conceptually one could write an IO wrapper like this:

class UngettableIO < IO
  def initialize(@io : IO)
    @back_buffer = Array(UInt8).new
  end

  def unread(slice : Bytes)
    slice.reverse_each { |b| @back_buffer.unshift(b) }
  end

  def read(slice : Bytes)
    if @back_buffer.empty?
      @io.read(slice)
    else
      bytes_read = {@back_buffer.size, slice.size}.min
      @back_buffer.to_unsafe.copy_to(slice.to_unsafe, bytes_read)
      @back_buffer.truncate(bytes_read..)
      bytes_read
    end
  end

  def write(slice : Bytes) : Nil
    raise "not writable"
  end
end
1 Like

hehe That is pretty much exactly what I ended up doing as a workaround. I called it PrependableIO and used an IO::Memory (and, as you might imagine, had to deal with cursor shenanigans due to reads and writes using the same cursor), but otherwise, yep.

  class PrependableIO < IO
    getter buffer : IO::Memory
    getter io : IO

    def initialize(@io, @buffer = IO::Memory.new)
    end

    def prepend(text : String) : Nil
      @buffer.clear
      @buffer << text
      @buffer.rewind
    end

    def read(slice : Bytes) : Int32
      if buffer.pos < buffer.size
        buffer
      else
        io
      end.read slice
    end

    def write(slice : Bytes) : Nil
      raise NotImplementedError.new("Can't write to a PrependableIO")
    end
  end

I was able to optimize for the fact that I’m only ever prepending one line of text at a time, but your idea is a lot more robust. Could probably even make it a Deque to lean on its ring buffer concept.

You might want to take a look at IO::Delimited which seems like it operates the way you need (chaining multiple instances).
The implementation might be an interesting read either way. It uses the peek buffer when available. That makes it pretty complex, though. Because it also needs to handle the path when peek is unavailable, leading to duplication.

1 Like