Implementing a fiber fed queue

So I want to make a queue that gets processed in my main fiber, which is fed by other fibers.

Obviously a loop that checks if there’s anything in the queue, processes it, or sleeps for a while would work. But I’d like to minimize the latency but the lesser the sleep period, the more CPU usage. And it’s going to spend most of its time waiting.

So something with a channel that gets written to when the other fibers enqueue messages, and the main fiber receiveing but as send blocks, we’re essentially using the channel as the queue and blocking the other fibers when it gets full.

Tried looking for a simple queue implementation, but most seems to be aiming towards using a service, and that’s overkill for my situation.

Any hints?

Feel like 5 use cases for Crystal's select statement - lbarasti's blog might push you in the right direction.

1 Like

I wrote a shard for this a couple years ago based on the Rust mpsc library. It’s not a perfect recreation of it, but it’s an unbounded channel that was explicitly not designed to handle multiple consumers the way the stdlib Channel does. It will grow as large as it needs to in order to accommodate the data it’s being fed, allowing for surges in input.

If you need it, MPSC::Channel#receive? returns nil if it’s empty, so you also get atomic emptiness checks.

I originally wrote it to support my opentelemetry shard. During load tests, using a stdlib Channel throttled incoming HTTP requests to my app because the Channel buffer was full, so I needed something that would never block on send. Reallocating buffers was preferred over blocking for that use case.

Caveats:

  • it’s strictly single-consumer, so you can’t consume it from multiple fibers — it will raise an exception if you try
  • it doesn’t (currently) support select, so you can’t have it timeout
1 Like

@Blacksmoke16 @Xen I checked this link which you have mentioned and it uses the select statement. i think this can be much shorter and easier if the select is merged with the .each. if multiple requests are there such .each.select { |each| puts “some text” if v=terminal.recieve } what do you think?.

I’m not sure what .each you’re referring to, but the select talked about in the blog is not the same thing as Enumerable#select. So I don’t think it’s as simple as you think it is.

@Blacksmoke16 do you think can it be put like this as an add-on such as if multiple request are coming then open an array such as array = Array.new() and instead of puts v, it can be added to the array as array.append(requests) and then if array.length == 0 break and terminate.

Oh, excellent post. I had a feeling there was something with “select” but I couldn’t find it. Which is obvious now, it’s not really documented…

Pondering this while walking the dogs, I came to the conclusion that it’s just a matter of using non-blocking send and have the main fiber check the queue before receiving. Fibers takes a bit getting used to, my initial thought was “but what if another fiber queues another item in between the main fiber checking the queue and calling receive” and then realizing that’s not going to happen (until we bring in real multi threading).

Oh, nice.