Waitgroup Implementation

I recently came across this lightweight implementation of waitgroups called Pond. It’s a really nice implementation.

Is there any interest from folks if I were to port Pond to be a generic waitgroup implementation for Crystal? I know there is ongoing discussion of implementing a structured concurrency model in Crystal, but I don’t think this primitive would be mutually exclusive. Waitgroups are something I find myself having to implement manually across a lot of projects.

5 Likes

Yeah, the Channel mechanic seems to be the officially recommended way to do it but it feels really clunky for what I typically use them for:

channel = Channel(Nil).new

100.times do
  spawn do
    do_work
  ensure
    channel.send nil
  end
end

100.times { channel.receive }

I usually end up using future for this:

things_to_process
  .map { |thing| future { do_work(thing) } }
  .each(&.get)

This is probably a big thing future was created for, if I had to guess. It’s been a long time since I’ve used anything like a waitgroup as a first-class entity — and it was probably thread groups or process groups at the time.

I’d be curious about some other use cases for waitgroups, though, because I typically only use this style for fanning out work.

but I don’t think this primitive would be mutually exclusive

I kinda do, as it would solve the same basic problem as a nursery would solve. You basically just have to replace the ability to add any random fiber to having a specific method on the pond to spawn a fiber and you’d have a nursery. But it seems to be a pretty neat implementation.

Aside: There are so many people (me included) that are building stuff to help with fibers, which really show that raw channels + fibers are not sufficient for the needs people have. Perhaps it is time for people to actually cooperate to build something great instead of having everyone build their own :slight_smile:

5 Likes

Yeah, I think a coordinated approach would be benefitial. These kind of tasks are very common and working solutions apply for a vast majority of uses cases. There should be a good implementation in stdlib. I’d really like to pick up #6468 again.

7 Likes

Cooperating on concurrency? That’s why I created:

Whats inside

  • Parallel map/reduce
  • CountDownLatch (I should probably rename to WaitGroup)
  • Semaphore
  • Other shit

Waitgroup/Latch design

The design is the opposite of Pond using both a channel and atomic counter.

Why? I benchmarked hooking in to the scheduler, Mutexes, atomics and of course Channels.

  • Modifying the scheduler was ruled out as too brittle with Crystal possibly changing it’s internals over time.
  • Mutexes surprisingly never performed better than Channels.
  • Atomics performed slightly better Channels, but don’t provide a way to signal fibers waiting on the WaitGroup without interfacing with the scheduler.

So a hybrid design was born.
Atomic counters are used to track individual fiber exits (for speed).
A channel is used to coordinate with any fibers waiting for the WaitGroup to finish.

WaitGroup/Latch error handling (unique feature)

A single error return was added covering a common pattern of

  • Spawn a bunch of tasks
  • Did all of them succeed? No? Show me the first error.

This is equivalent to @jgaskins example, without creating a future object for each fiber:
(IDK which future shard he’s using, but it probably contains a Channel or more per future or isn’t -Dpreview_mt safe/efficient.)

wg = Concurrent::CountDownLatch.new things_to_process.size
things.to_process.each { |thing| wg.spawn { do_work(thing) } }
wg.wait # raises if any Fibers have uncaught exceptions

This is probably lower overhead than using futures but only matters with a large number of tasks.

Future vs Waitgroup. Which to use?

Most WaitGroup uses appear to

  • Execute something without returning a value
  • Signal exit to the WaitGroup
  • Sometimes report an error
  • Blow up when fed large amounts of data (see next section)

If a return value is needed Channels or Futures are used making WaitGroups unnecessary.

Resource exhaustion when using WaitGroups, Channels or Futures

Consider a large number of tasks with either [Waitgroups, Channels or Futures] opening [a file, network connection, database, large amount of RAM or other limited resource]:

urls = [10_000_000_urls, ...]
urls.each do |url|
  # Try any of these
  spawn { http_get(url) }
  wg.spawn { http_get(url) }
  future { http_get(url) }
end

10_000_000 open files? Shit’s gonna break.

Ok, just wrap a Semaphore around limited resources. Problem solved right?

semaphore = Semaphore.new(100) # Allow 100 parallel operations
# In loop
  spawn { semaphore.acquire { http_get(url) } }

This is getting complicated. There must be a better way…

(Stay tuned for Episode 2)

2 Likes

I don’t want to pick on Pond but… it appears to use busy waiting.

If your application is waiting on IO or sleep, Pond.drain will eat a core looping.
With -Dpreview_mt it’s worse. When the number of active Fibers drops below CRYSTAL_WORKERS it will eat a core regardless of being IO or CPU bound.

If I read the code correctly. Maybe I made a mistake.