Needing a concurrency construct like a `ConditionVariable`

In my NATS client, all messages to the server are buffered and there’s a fiber whose job it is to flush that buffer. The problem is that I don’t know how to signal that fiber to wake up, so right now it just wakes up in intervals to flush the buffer before going back to sleep.

When waking it up, what I’d really like is just to enqueue it rather than scheduling it as the next fiber to run. I want it to batch writes if it can for high throughput (in which case the explicit flush may not be necessary) but also reduce latency at low throughput. I don’t want to add up to 10ms to message latency because that message was put into the buffer at an unlucky time.

With POSIX threads, I might reach for a ConditionVariable (Ruby implementation) to signal that fiber, but I haven’t found a suitable construct in Crystal for this yet. The closest I’ve found is Channel, but I’ve tried that and it’s a lot slower than the current implementation.

Any ideas?

Thread::ConditionVariable exists, but it is never used nor documented (like the rest of the multithreaded support types). I don’t know if it is fiber-safe.

1 Like

Indeed. I’m trying to stick to things that are documented so I won’t have to worry about changes as much. :smile:

wait will clearly block until some other thread wakes it up, so I’d guess @jgaskins would need a fiber aware implementation, similar to how crystal provide a fiber aware implementation of mutex.

I’ve played around with using an eventfd for this, though this limits you to linux. Using a pipe could probably be a fallback on other systems. Gist: eventfd.cr · GitHub

EDIT: Hmm, apparently FreeBSD added eventfd recently. Still not available on mac or windows though.