I’m trying to create a Kubernetes operator, which involves fetching newline-delimited JSON streamed from the Kubernetes API. Unfortunately the stdlib JSON parser seems to assume that after the top-level JSON object is closed, that’s the end of the stream.
Example code
require "json"
io = IO::Memory.new(<<-JSON)
{"name":"foo"}
{"name":"bar"}
JSON
io.rewind
pp Resource.from_json io
pp Resource.from_json io
struct Resource
include JSON::Serializable
getter name : String
end
Error message and stack trace
Unhandled exception: Unexpected token: { at line 2, column 1
parsing Resource#name at line 1, column 2 (JSON::SerializableError)
from /usr/local/Cellar/crystal/1.1.0/src/json/serialization.cr:159:7 in 'initialize:__pull_for_json_serializable'
from ndjson.cr:17:3 in 'new_from_json_pull_parser'
from ndjson.cr:17:3 in 'new'
from /usr/local/Cellar/crystal/1.1.0/src/json/from_json.cr:13:3 in 'from_json'
from ndjson.cr:13:4 in '__crystal_main'
from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:110:5 in 'main_user_code'
from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:96:7 in 'main'
from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:119:3 in 'main'
Caused by: Unexpected token: { at line 2, column 1 (JSON::ParseException)
from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:691:7 in 'raise'
from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:686:5 in 'unexpected_token'
from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:673:14 in 'next_token_after_array_or_object'
from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:509:9 in 'read_next_internal'
from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:396:5 in 'read_next'
from /usr/local/Cellar/crystal/1.1.0/src/json/pull_parser.cr:306:25 in 'read_string'
from /usr/local/Cellar/crystal/1.1.0/src/json/from_json.cr:132:3 in 'new'
from /usr/local/Cellar/crystal/1.1.0/src/json/serialization.cr:159:7 in 'initialize:__pull_for_json_serializable'
from ndjson.cr:17:3 in 'new_from_json_pull_parser'
from ndjson.cr:17:3 in 'new'
from /usr/local/Cellar/crystal/1.1.0/src/json/from_json.cr:13:3 in 'from_json'
from ndjson.cr:13:4 in '__crystal_main'
from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:110:5 in 'main_user_code'
from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:96:7 in 'main'
from /usr/local/Cellar/crystal/1.1.0/src/crystal/main.cr:119:3 in 'main'
Is there a way the JSON::PullParser can be configured to avoid raising when the stream isn’t complete after the first object? It makes it so I can’t do something like this:
def each_event
http_client.get("#{events_path}?watch=1") do |response|
until response.body_io.closed?
yield Event.from_json(response.body_io)
end
rescue ex : IO::Error
# response body is closed
end
end
I’m currently reading each line as a string and parsing that string, but I don’t love having multiple representations of each object in memory at once, especially when one of them would live entirely on the heap, so I’d prefer to parse straight from the IO if I can.
I guess it would be nice to be able to replace the delimlited IO’ io to avoid memory allocations. Also, I think IO delimlited could be further optimized
Indeed. I added a method to it that lets me reuse the same instance to keep reading, similar to what we did with IO::Sized:
class IO::Delimited
def reset
@finished = false
self
end
end
In my benchmarks, it makes almost no difference, but that’s probably because for every time I was reusing an instance of it I was also parsing a bunch of JSON. :-D
This is kinda what I was thinking about originally, as well, and was working on removing the EOF expectation from JSON::PullParser and JSON::Parser when I decided to post here.
But I do agree with you in principle. IO::Delimited worked for my use case but if I were parsing pretty-printed JSON, I don’t think it would. The chances of that are probably pretty low because you wouldn’t think an API implementing NDJSON would contain newlines inside the JSON object itself, but some third-party APIs sure take liberties with otherwise well defined standards.
Is there a reason the JSON parsers assume EOF after the JSON object? I’ve wondered about it for a while now but never had a solid reason to question it until now. It’s a little confusing that I can write two objects consecutively to an IO but I can’t read them the same way.
When you consume a server response by reading from an IO, how would you distinguish between the stream ending or the JSON object ending and garbage following it? Or, for example, if I send two JSON objects one after another, should that be valid and only the first one be taken?
I think there needs to be a configuration when parsing JSON that says “stop after the first object and don’t check for EOF because I want to parse more objects afterwards”, but it can’t be the default.
IMO, that makes a lot of sense. We can stop consuming an IO at any point, so ideally we could stop consuming it after we’ve parsed a complete object. What follows the object in the stream is irrelevant to it, so a top-level } can be the end of parsing.
When reading from a String, it makes a lot of sense to assume that the string only contains a single JSON object. It’s reasonable to assume a string is a single serialized JSON value because you have the entire payload in memory all at once and you’re not “consuming” that. But when reading from an IO, you don’t necessarily have all of the data in application memory (could be in a socket buffer or disk buffer, or a remote server may not have even sent it yet) and you are consuming it in a potentially irreversible way.
The example I gave at the top of this thread was the Kubernetes API (specifically watch endpoints) but the Twitter streaming API (the one bots are encouraged to use) does the same thing. Like long-polling, but with a single HTTP request.
notes/caveats/disclosures
We’ve gone this long without anyone needing this, AFAIK, and there are workarounds, like IO::Delimited, that seem to cover the vast majority of these use cases so the need doesn’t seem urgent
The current implementation of JSON::PullParser might not even be able to accommodate this — it seems to consume the next token from the IO before returning the current one. For example, it reads the BeginObject token from the second object before it returns the value of the last key/value pair of the first object. So it may require rewriting most (all?) of it to accommodate this idea. So someone’s gotta really want to put in that effort and, right now, I’m not feeling that ambitious. :-)
I’m coming at this from the perspective of someone who has never implemented a parser in Crystal for a text-based protocol that uses delimiters between values. My shards that deal with parsing data structures (Neo4j, Redis, gRPC) deal with everything up front (byte markers, size fields, etc) and, honestly, that seems soooo much simpler, so I may very well misunderstand the constraints and complexity of parsing JSON.
I don’t have a computer by me to try it, but for example if you use the Github api, or any api, and you post a json followed by a newline and garbage, does that work?
What I’m saying is that APIs that expect a JSON probably expect exactly a JSON, and APIs that allow newline delimlited JSON allow that other thing. So to me, it seems these should be two separate libraries or implementations (one could be done on top of the other)
HTTP APIs aren’t the only use case for a streaming JSON parser, though. JSON is just a data-interchange format. There’s nothing in the JSON spec that prescribes only one object per stream.
MessagePack, which is just binary JSON, supports streaming multiple objects without a separate parser or even a separate configuration option. It’s just how it works:
# from crystal-community/msgpack-crystal shard
require "msgpack"
io = IO::Memory.new
{name: "foo"}.to_msgpack io
{name: "bar"}.to_msgpack io
io.rewind
pp Resource.from_msgpack io
# => Resource(@name="foo")
pp Resource.from_msgpack io
# => Resource(@name="bar")
struct Resource
include MessagePack::Serializable
getter name : String
end
This is effectively the same as the example code from the first post in this thread, but encodes and decodes with MessagePack instead of JSON. This works because the parser stops after it finishes parsing the object. If it works for MessagePack, is there a reason not to do the same with JSON?
/Users/asterite/.rbenv/versions/3.0.0/lib/ruby/3.0.0/json/common.rb:216:in `parse': 809: unexpected token at '{"foo": 2}' (JSON::ParserError)
from /Users/asterite/.rbenv/versions/3.0.0/lib/ruby/3.0.0/json/common.rb:216:in `parse'
from /Users/asterite/.rbenv/versions/3.0.0/lib/ruby/3.0.0/json/common.rb:569:in `load'
from foo.cr:8:in `<main>'
So at least Ruby doesn’t like parsing from an IO and stopping after an object.
In Go you can unmarshal a JSON from a string and no trailing object is allowed. But then you can unmarshal from an IO and it stops right after an object.
So, we could make it a difference: if you parse from a string, we check for EOF (after any trailing whitespace.) If we parse from an IO we stop after an object. However, at this point we can’t introduce breaking changes, so…
ndjson is just a different standard/language than json so maybe trying yo generalize the pull parser is not the best thing to do, as much tempting it might be.
unless there is a generalization that can be combined in an orthogonal way think is best to just have a different implementation/shard.