Is this the only way to terminate?

Since you need to wait for a set of producer fibers and a consumer fiber to complete, I would use WaitGroup for this.

In the example below, I set up a WaitGroup for the producer fibers and the consumer fiber. The main fiber increments the producer wait group and the producer fibers increment the consumer wait group.

Then after all that’s been setup, we wait for all of the producers to complete with producer_wg.wait, then close the result channel. It’s important that the producers’ results are already in the channel by then.

require "http"
require "wait_group"

urls = %w{
  www.python.org
  www.cpan.org
  www.perl.org
  developer.apple.com
  www.sbcl.org
  crystal-lang.org
  www.graalvm.org
}

channel = Channel(Tuple(HTTP::Status, String)).new
producer_wg = WaitGroup.new
consumer_wg = WaitGroup.new

urls.each do |url|
  producer_wg.add

  spawn do
    result = {HTTP::Client.get("https://#{url}").status, url}

    channel.send result
    consumer_wg.add
  ensure
    producer_wg.done
  end
end

spawn do
  # Read from the channel until it's closed
  while result = channel.receive?
    status, url = result
    puts "#{status} #{url}"
    consumer_wg.done
  end
end

# After all requests are completed, we can close the channel, which will allow the
producer_wg.wait
channel.close
consumer_wg.wait

Notes:

  • Since the channel does not have a buffer size greater than the number of URLs being fetched concurrently, you must consume the channel while the producers are still running. That’s why we run it in a fiber before waiting on producer_wg. Otherwise the operation will deadlock.
    • This channel doesn’t have a buffer at all so every send will block until receive is called in another fiber, but it would be true if it had a buffer smaller than urls.size.
    • If urls.size is unknown (for example, if you’re processing results from a stream), this pattern might cut down on memory consumption.
  • I’ve never used libacme before, so since your function returns a LibC::Long I replaced it with HTTP::Client.get and returned the status so it would work on my machine. Feel free to update the HTTP::Client.get(url).status expression back to your LibAcme.curl(url) expression.
  • In Crystal 1.14, WaitGroup will be getting a friendlier API, but for now we need to call wg.done manually in an ensure block.
3 Likes