Newline-delimited JSON

I’m trying to create a Kubernetes operator, which involves fetching newline-delimited JSON streamed from the Kubernetes API. Unfortunately the stdlib JSON parser seems to assume that after the top-level JSON object is closed, that’s the end of the stream.

Example code
require "json"

io = IO::Memory.new(<<-JSON)
{"name":"foo"}
{"name":"bar"}
JSON

io.rewind
pp Resource.from_json io
pp Resource.from_json io

struct Resource
  include JSON::Serializable

  getter name : String
end
Error message and stack trace
Unhandled exception: Unexpected token: { at line 2, column 1
  parsing Resource#name at line 1, column 2 (JSON::SerializableError)
  from /usr/local/Cellar/crystal/1.1.0/src/json/serialization.cr:159:7 in 'initialize:__pull_for_json_serializable'
  from ndjson.cr:17:3 in 'new_from_json_pull_parser'
  from ndjson.cr:17:3 in 'new'
  from /usr/local/Cellar/crystal/1.1.0/src/json/from_json.cr:13:3 in 'from_json'
  from ndjson.cr:13:4 in '__crystal_main'
  from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:110:5 in 'main_user_code'
  from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:96:7 in 'main'
  from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:119:3 in 'main'
Caused by: Unexpected token: { at line 2, column 1 (JSON::ParseException)
  from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:691:7 in 'raise'
  from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:686:5 in 'unexpected_token'
  from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:673:14 in 'next_token_after_array_or_object'
  from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:509:9 in 'read_next_internal'
  from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:396:5 in 'read_next'
  from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:306:25 in 'read_string'
  from /usr/local/Cellar/crystal/1.1.0/src/json/from_json.cr:132:3 in 'new'
  from /usr/local/Cellar/crystal/1.1.0/src/json/serialization.cr:159:7 in 'initialize:__pull_for_json_serializable'
  from ndjson.cr:17:3 in 'new_from_json_pull_parser'
  from ndjson.cr:17:3 in 'new'
  from /usr/local/Cellar/crystal/1.1.0/src/json/from_json.cr:13:3 in 'from_json'
  from ndjson.cr:13:4 in '__crystal_main'
  from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:110:5 in 'main_user_code'
  from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:96:7 in 'main'
  from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:119:3 in 'main'

Is there a way the JSON::PullParser can be configured to avoid raising when the stream isn’t complete after the first object? It makes it so I can’t do something like this:

def each_event
  http_client.get("#{events_path}?watch=1") do |response|
    until response.body_io.closed?
      yield Event.from_json(response.body_io)
    end
  rescue ex : IO::Error
    # response body is closed
  end
end

I’m currently reading each line as a string and parsing that string, but I don’t love having multiple representations of each object in memory at once, especially when one of them would live entirely on the heap, so I’d prefer to parse straight from the IO if I can.

Could do something like this:

memory_io = IO::Memory.new(<<-JSON)
{"name":"foo"}
{"name":"bar"}
JSON

memory_io.rewind
size = memory_io.size

until memory_io.pos == size
  pp Resource.from_json IO::Delimited.new memory_io, "\n"
end
2 Likes

Ooooh, TIL IO::Delimited! Thanks!

I guess it would be nice to be able to replace the delimlited IO’ io to avoid memory allocations. Also, I think IO delimlited could be further optimized

Indeed. I added a method to it that lets me reuse the same instance to keep reading, similar to what we did with IO::Sized:

class IO::Delimited
  def reset
    @finished = false
    self
  end
end

In my benchmarks, it makes almost no difference, but that’s probably because for every time I was reusing an instance of it I was also parsing a bunch of JSON. :-D

For this use case, a customized JSON stream parser might be a good solution. Perhaps this could even be an option for the regular JSON parser.

This is kinda what I was thinking about originally, as well, and was working on removing the EOF expectation from JSON::PullParser and JSON::Parser when I decided to post here.

But I do agree with you in principle. IO::Delimited worked for my use case but if I were parsing pretty-printed JSON, I don’t think it would. The chances of that are probably pretty low because you wouldn’t think an API implementing NDJSON would contain newlines inside the JSON object itself, but some third-party APIs sure take liberties with otherwise well defined standards.

Is there a reason the JSON parsers assume EOF after the JSON object? I’ve wondered about it for a while now but never had a solid reason to question it until now. It’s a little confusing that I can write two objects consecutively to an IO but I can’t read them the same way.

When you consume a server response by reading from an IO, how would you distinguish between the stream ending or the JSON object ending and garbage following it? Or, for example, if I send two JSON objects one after another, should that be valid and only the first one be taken?

I think there needs to be a configuration when parsing JSON that says “stop after the first object and don’t check for EOF because I want to parse more objects afterwards”, but it can’t be the default.

Is there a reason to make this distinction?

IMO, that makes a lot of sense. We can stop consuming an IO at any point, so ideally we could stop consuming it after we’ve parsed a complete object. What follows the object in the stream is irrelevant to it, so a top-level } can be the end of parsing.

When reading from a String, it makes a lot of sense to assume that the string only contains a single JSON object. It’s reasonable to assume a string is a single serialized JSON value because you have the entire payload in memory all at once and you’re not “consuming” that. But when reading from an IO, you don’t necessarily have all of the data in application memory (could be in a socket buffer or disk buffer, or a remote server may not have even sent it yet) and you are consuming it in a potentially irreversible way.

The example I gave at the top of this thread was the Kubernetes API (specifically watch endpoints) but the Twitter streaming API (the one bots are encouraged to use) does the same thing. Like long-polling, but with a single HTTP request.

notes/caveats/disclosures

  • We’ve gone this long without anyone needing this, AFAIK, and there are workarounds, like IO::Delimited, that seem to cover the vast majority of these use cases so the need doesn’t seem urgent
  • The current implementation of JSON::PullParser might not even be able to accommodate this — it seems to consume the next token from the IO before returning the current one. For example, it reads the BeginObject token from the second object before it returns the value of the last key/value pair of the first object. So it may require rewriting most (all?) of it to accommodate this idea. So someone’s gotta really want to put in that effort and, right now, I’m not feeling that ambitious. :-)
  • I’m coming at this from the perspective of someone who has never implemented a parser in Crystal for a text-based protocol that uses delimiters between values. My shards that deal with parsing data structures (Neo4j, Redis, gRPC) deal with everything up front (byte markers, size fields, etc) and, honestly, that seems soooo much simpler, so I may very well misunderstand the constraints and complexity of parsing JSON.

I don’t have a computer by me to try it, but for example if you use the Github api, or any api, and you post a json followed by a newline and garbage, does that work?

What I’m saying is that APIs that expect a JSON probably expect exactly a JSON, and APIs that allow newline delimlited JSON allow that other thing. So to me, it seems these should be two separate libraries or implementations (one could be done on top of the other)

Or, maybe pull parser could allow parsing consecutive objects, and it’s the from_json method that controls that behavior and checks for eof or not.

HTTP APIs aren’t the only use case for a streaming JSON parser, though. JSON is just a data-interchange format. There’s nothing in the JSON spec that prescribes only one object per stream.

MessagePack, which is just binary JSON, supports streaming multiple objects without a separate parser or even a separate configuration option. It’s just how it works:

# from crystal-community/msgpack-crystal shard
require "msgpack"

io = IO::Memory.new
{name: "foo"}.to_msgpack io
{name: "bar"}.to_msgpack io
io.rewind

pp Resource.from_msgpack io
# => Resource(@name="foo")
pp Resource.from_msgpack io
# => Resource(@name="bar")

struct Resource
  include MessagePack::Serializable

  getter name : String
end

This is effectively the same as the example code from the first post in this thread, but encodes and decodes with MessagePack instead of JSON. This works because the parser stops after it finishes parsing the object. If it works for MessagePack, is there a reason not to do the same with JSON?

1 Like

I guess it would make sense.

I just tried this in Ruby:

require "stringio"
require "json"

io = StringIO.new
io << %({"foo": 1}\n{"foo": 2})
io.rewind

p JSON.load(io)

it fails with:

/Users/asterite/.rbenv/versions/3.0.0/lib/ruby/3.0.0/json/common.rb:216:in `parse': 809: unexpected token at '{"foo": 2}' (JSON::ParserError)
	from /Users/asterite/.rbenv/versions/3.0.0/lib/ruby/3.0.0/json/common.rb:216:in `parse'
	from /Users/asterite/.rbenv/versions/3.0.0/lib/ruby/3.0.0/json/common.rb:569:in `load'
	from foo.cr:8:in `<main>'

So at least Ruby doesn’t like parsing from an IO and stopping after an object.

In Go you can unmarshal a JSON from a string and no trailing object is allowed. But then you can unmarshal from an IO and it stops right after an object.

So, we could make it a difference: if you parse from a string, we check for EOF (after any trailing whitespace.) If we parse from an IO we stop after an object. However, at this point we can’t introduce breaking changes, so…

1 Like