I feel like I’ve seen an implementation somewhere of a readable IO that chains multiple other readable IO objects where, when one hits EOF, it picks up reading the next.
Am I making this up?
My use case is that I’m reading messages from a socket, but it’s text-based and the sentinel value is the start of the next message. But I’ve already read it, so it’s gone from the socket buffer. Effectively what I’m trying to do is the equivalent of prepending the message back to the socket buffer so it can be read as the beginning of the next message.
I don’t control the message format or I’d structure it in a way that’s more suited to how I’m consuming it. I’m streaming it off the socket instead of pulling the entire payload into RAM because these messages may be large, there may be a lot of them, and there may be many concurrent requests, so I’m holding at most one at a time per request in memory.
The only thing in the standard library that comes close to concatenating IOs on the fly is IO::ARGF, but what you’re describing sounds more like ungetc. Conceptually one could write an IO wrapper like this:
class UngettableIO < IO
def initialize(@io : IO)
@back_buffer = Array(UInt8).new
end
def unread(slice : Bytes)
slice.reverse_each { |b| @back_buffer.unshift(b) }
end
def read(slice : Bytes)
if @back_buffer.empty?
@io.read(slice)
else
bytes_read = {@back_buffer.size, slice.size}.min
@back_buffer.to_unsafe.copy_to(slice.to_unsafe, bytes_read)
@back_buffer.truncate(bytes_read..)
bytes_read
end
end
def write(slice : Bytes) : Nil
raise "not writable"
end
end
1 Like
hehe That is pretty much exactly what I ended up doing as a workaround. I called it PrependableIO and used an IO::Memory (and, as you might imagine, had to deal with cursor shenanigans due to reads and writes using the same cursor), but otherwise, yep.
class PrependableIO < IO
getter buffer : IO::Memory
getter io : IO
def initialize(@io, @buffer = IO::Memory.new)
end
def prepend(text : String) : Nil
@buffer.clear
@buffer << text
@buffer.rewind
end
def read(slice : Bytes) : Int32
if buffer.pos < buffer.size
buffer
else
io
end.read slice
end
def write(slice : Bytes) : Nil
raise NotImplementedError.new("Can't write to a PrependableIO")
end
end
I was able to optimize for the fact that I’m only ever prepending one line of text at a time, but your idea is a lot more robust. Could probably even make it a Deque to lean on its ring buffer concept.
You might want to take a look at IO::Delimited which seems like it operates the way you need (chaining multiple instances).
The implementation might be an interesting read either way. It uses the peek buffer when available. That makes it pretty complex, though. Because it also needs to handle the path when peek is unavailable, leading to duplication.
1 Like