Cooperating on concurrency? That’s why I created:
Whats inside
- Parallel map/reduce
- CountDownLatch (I should probably rename to WaitGroup)
- Semaphore
- Other shit
Waitgroup/Latch design
The design is the opposite of Pond using both a channel and atomic counter.
Why? I benchmarked hooking in to the scheduler, Mutexes, atomics and of course Channels.
- Modifying the scheduler was ruled out as too brittle with Crystal possibly changing it’s internals over time.
- Mutexes surprisingly never performed better than Channels.
- Atomics performed slightly better Channels, but don’t provide a way to signal fibers waiting on the WaitGroup without interfacing with the scheduler.
So a hybrid design was born.
Atomic counters are used to track individual fiber exits (for speed).
A channel is used to coordinate with any fibers waiting for the WaitGroup to finish.
WaitGroup/Latch error handling (unique feature)
A single error return was added covering a common pattern of
- Spawn a bunch of tasks
- Did all of them succeed? No? Show me the first error.
This is equivalent to @jgaskins example, without creating a future object for each fiber:
(IDK which future shard he’s using, but it probably contains a Channel or more per future or isn’t -Dpreview_mt safe/efficient.)
wg = Concurrent::CountDownLatch.new things_to_process.size
things.to_process.each { |thing| wg.spawn { do_work(thing) } }
wg.wait # raises if any Fibers have uncaught exceptions
This is probably lower overhead than using futures but only matters with a large number of tasks.
Future vs Waitgroup. Which to use?
Most WaitGroup uses appear to
- Execute something without returning a value
- Signal exit to the WaitGroup
- Sometimes report an error
- Blow up when fed large amounts of data (see next section)
If a return value is needed Channels or Futures are used making WaitGroups unnecessary.
Resource exhaustion when using WaitGroups, Channels or Futures
Consider a large number of tasks with either [Waitgroups, Channels or Futures] opening [a file, network connection, database, large amount of RAM or other limited resource]:
urls = [10_000_000_urls, ...]
urls.each do |url|
# Try any of these
spawn { http_get(url) }
wg.spawn { http_get(url) }
future { http_get(url) }
end
10_000_000 open files? Shit’s gonna break.
Ok, just wrap a Semaphore around limited resources. Problem solved right?
semaphore = Semaphore.new(100) # Allow 100 parallel operations
# In loop
spawn { semaphore.acquire { http_get(url) } }
This is getting complicated. There must be a better way…
(Stay tuned for Episode 2)