Progressively reading JSON from a JSON::PullParser

Hi,

I’d like to check that I am not missing something before I start writing some code to work around this problem.

I have a Process, which I have connect to via a pipe, and which outputs JSON objects one at a time in response to it’s input. I had originally hoped that I would be able to wrap the process’ IO object in a JSON::PullParser and then use read_raw (for example, in reality I would is #to_json) to just return the next object on the stream. The problem is that after successfully completing the parsing of the object, PullParser always trys to read the stream to find out what the next token is before it returns the object, but in my case there is no input to read because the process will not output anything else until it receives another command. So if there is no more input available, but the IO is still open then the call to read_object blocks and doesn’t return the object that it has successfully read.

E.g, this code just runs forever (or times out on the playground) even though there is enough input to read one object:

https://play.crystal-lang.org/#/r/gift

require "json"

IO.pipe do |rd, wr|
  js = JSON::PullParser.new(rd)
  wr.puts "{}"
  puts js.read_raw
end

I have a few ideas of how to work around this but I would appreciate some advice on whether I am missing the “right” way to do this.

Many thanks!

I had to do something similar for oq where I wanted the generation of the JSON and parsing to happen concurrently. For that I did something like:

require "json"

channel = Channel(Bool | Exception).new
reader, writer = IO.pipe

spawn do
  writer.puts "[99"
  sleep 2
  writer.puts ",100"
  writer.puts "]"
  writer.close
  channel.send true
rescue ex
  channel.send ex
end

spawn do
  pull = JSON::PullParser.new reader
  pp pull.read_begin_array
  pp pull.read_int
  pp pull.read_int
  pp pull.read_end_array
  channel.send true
rescue ex
  channel.send ex
end

2.times do
  case v = channel.receive
  when Exception then raise v
  end
end

This code spawns a fiber that’ll write some data to the writer pipe. The sleep simulates the process doing something. Execution would then switch to the other fiber that news up the pull parser and starts to read, if it ever blocks due to needing to wait for more data, execution would switch back to the first fiber. This goes on until the first fiber closes the writer pipe and sends a value to a channel. Once both fibers “finish” (by sending a value to the channel), the process is complete.

EDIT: I guess in this context you could get away with running only the parsing code in a fiber, with the writing logic happening in the main fiber. Since in your case the writing logic is a Process.run I assume, so long as you spawn the parsing fiber before doing that you should be good.

Ref: oq/src/oq.cr at master · Blacksmoke16/oq · GitHub

Thank you for your reply, that is very interesting.

However, if I understand correctly, you still need to close the writer channel before pull.read_end_array will return, which in my case is not really possible because the process that is generating the output is a long running process: I want it to continue running and to keep the pipe open, but I want to take action whenever it completely outputs an object.

Your insight is very useful though, thanks again.

You may run into an issue here given JSON::PullParser assumes there’s only one top level object to parse. Was some discussion/workarounds to this in Newline-delimited JSON.

2 Likes

I recently needed this also. In my case reading ndjson from a stream.

I copied and adapt the pullparser. Nothing to be very proud of. But it did the work I needed.

1 Like

Brilliant, thank you for sharing, that’s really useful!