Since you need to wait for a set of producer fibers and a consumer fiber to complete, I would use WaitGroup for this.
In the example below, I set up a WaitGroup for the producer fibers and the consumer fiber. The main fiber increments the producer wait group and the producer fibers increment the consumer wait group.
Then after all that’s been setup, we wait for all of the producers to complete with producer_wg.wait, then close the result channel. It’s important that the producers’ results are already in the channel by then.
require "http"
require "wait_group"
urls = %w{
www.python.org
www.cpan.org
www.perl.org
developer.apple.com
www.sbcl.org
crystal-lang.org
www.graalvm.org
}
channel = Channel(Tuple(HTTP::Status, String)).new
producer_wg = WaitGroup.new
consumer_wg = WaitGroup.new
urls.each do |url|
producer_wg.add
spawn do
result = {HTTP::Client.get("https://#{url}").status, url}
channel.send result
consumer_wg.add
ensure
producer_wg.done
end
end
spawn do
# Read from the channel until it's closed
while result = channel.receive?
status, url = result
puts "#{status} #{url}"
consumer_wg.done
end
end
# After all requests are completed, we can close the channel, which will allow the
producer_wg.wait
channel.close
consumer_wg.wait
Notes:
- Since the channel does not have a buffer size greater than the number of URLs being fetched concurrently, you must consume the channel while the producers are still running. That’s why we run it in a fiber before waiting on
producer_wg. Otherwise the operation will deadlock.- This channel doesn’t have a buffer at all so every
sendwill block untilreceiveis called in another fiber, but it would be true if it had a buffer smaller thanurls.size. - If
urls.sizeis unknown (for example, if you’re processing results from a stream), this pattern might cut down on memory consumption.
- This channel doesn’t have a buffer at all so every
- I’ve never used
libacmebefore, so since your function returns aLibC::LongI replaced it withHTTP::Client.getand returned the status so it would work on my machine. Feel free to update theHTTP::Client.get(url).statusexpression back to yourLibAcme.curl(url)expression. - In Crystal 1.14,
WaitGroupwill be getting a friendlier API, but for now we need to callwg.donemanually in anensureblock.