Is this the only way to terminate?

Hi all,

please consider this:

@[Link("curl")]
@[Link(ldflags: "#{__DIR__}/libacme.o")]

lib LibAcme
  fun curl = ripp(url : LibC::Char*) : LibC::Long
end

Urls = %w{                                                                                       
  www.python.org                                                                                 
  www.cpan.org                                                                                   
  www.perl.org                                                                                   
  developer.apple.com                                                                            
  www.sbcl.org                                                                                   
  crystal-lang.org                                                                               
  www.graalvm.org                                                                                
}

fn = ->(url : String) {
  LibAcme.curl "https://#{url}"
}

chanl = Channel(Tuple(Int64, String)).new
terminate = Channel(Nil).new
done = Channel(Nil).new

Urls.each do |url|
  gizmo = spawn do
    result = {(fn.call(url)), url}
    chanl.send result
  end
end

spawn do
  loop do
    select
    when result = chanl.receive
      puts "#{result[0]} #{result[1]}"
    when terminate.receive?
      break
    end
  end
  done.close
end

terminate.close
done.receive?

Is this the only way to get out of the loop after receiving?

When the number of messages in the channel is unknown - in this case it is known and i could say Urls.size.times but that isn’t really realistic.

And to be honest: I did’t fully understand the last done.receive? until know.

Since you need to wait for a set of producer fibers and a consumer fiber to complete, I would use WaitGroup for this.

In the example below, I set up a WaitGroup for the producer fibers and the consumer fiber. The main fiber increments the producer wait group and the producer fibers increment the consumer wait group.

Then after all that’s been setup, we wait for all of the producers to complete with producer_wg.wait, then close the result channel. It’s important that the producers’ results are already in the channel by then.

require "http"
require "wait_group"

urls = %w{
  www.python.org
  www.cpan.org
  www.perl.org
  developer.apple.com
  www.sbcl.org
  crystal-lang.org
  www.graalvm.org
}

channel = Channel(Tuple(HTTP::Status, String)).new
producer_wg = WaitGroup.new
consumer_wg = WaitGroup.new

urls.each do |url|
  producer_wg.add

  spawn do
    result = {HTTP::Client.get("https://#{url}").status, url}

    channel.send result
    consumer_wg.add
  ensure
    producer_wg.done
  end
end

spawn do
  # Read from the channel until it's closed
  while result = channel.receive?
    status, url = result
    puts "#{status} #{url}"
    consumer_wg.done
  end
end

# After all requests are completed, we can close the channel, which will allow the
producer_wg.wait
channel.close
consumer_wg.wait

Notes:

  • Since the channel does not have a buffer size greater than the number of URLs being fetched concurrently, you must consume the channel while the producers are still running. That’s why we run it in a fiber before waiting on producer_wg. Otherwise the operation will deadlock.
    • This channel doesn’t have a buffer at all so every send will block until receive is called in another fiber, but it would be true if it had a buffer smaller than urls.size.
    • If urls.size is unknown (for example, if you’re processing results from a stream), this pattern might cut down on memory consumption.
  • I’ve never used libacme before, so since your function returns a LibC::Long I replaced it with HTTP::Client.get and returned the status so it would work on my machine. Feel free to update the HTTP::Client.get(url).status expression back to your LibAcme.curl(url) expression.
  • In Crystal 1.14, WaitGroup will be getting a friendlier API, but for now we need to call wg.done manually in an ensure block.
3 Likes

Ah, thanks. Like in golang with wg.Add(1) , wg.Wait() and wg.Done(), right?

Here for completeness the little lib:

#include <curl/curl.h>

long int ripp(const char *url) {
  CURL *curl;
  long status;
  curl = curl_easy_init();
  char *uagent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15) "
                 "AppleWebKit/605.1.15 (KHTML, like Gecko) "
                 "Version/13.0 Safari/605.1.15";
  long int n = 2;
  curl_easy_setopt(curl, CURLOPT_TIMEOUT, n);
  curl_easy_setopt(curl, CURLOPT_URL, url);
  curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
  curl_easy_setopt(curl, CURLOPT_USERAGENT, uagent);
  curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
  curl_easy_perform(curl);
  curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
  return status;
}

I chose to make it so for 1. fun & learning purposes, proof-of-concept and 2. because Http::Client doesn’t follow redirects as far as i know.

I learned a lot with just two posts. Thanks.

1 Like

Bingo. Like a lot of concurrency features in the Crystal stdlib, the WaitGroup class was heavily inspired by sync.WaitGroup in Golang.

1 Like

This is also true. There’s an issue for it, but it hasn’t been implemented.

2 Likes

I have some question for this.

When the number of messages in the channel is unknown - in this case it is known and i could say Urls.size.times but that isn’t really realistic.

I asked this question before in the forum(although can’t found it).

Current given solution is:

spawn do
  # Read from the channel until it's closed
  while result = channel.receive?
      ...
   end
end

But, if the original code use select with timeout, how to solve this? i consider this is not a equivalent replace for original code, we still need a way to manually or automatically count the original number, or, a way to break out of the loop automatically use WaitGroup instead of result = channel.receive? ?

An example, how to replace this (select + timeout) use above while pattern?

Thanks.

Prob could have the select + timeout in a loop do then use another channel to send a value to once you’re done processing all the items that would break out of the loop?

I’m not sure I’d use select / timeout with HTTP calls. We can use HTTP::Client#read_timeout for the same purpose:

require "http"
require "wait_group"

urls = %w{
  www.python.org
  www.cpan.org
  www.perl.org
  developer.apple.com
  www.sbcl.org
  crystal-lang.org
  www.graalvm.org
}

channel = Channel(Tuple(HTTP::Status, String)).new
producer_wg = WaitGroup.new
consumer_wg = WaitGroup.new

urls.each do |url|
  producer_wg.add

  spawn do
    uri = URI.parse("https://#{url}")
    http = HTTP::Client.new(uri)
    http.read_timeout = 50.milliseconds
    result = {http.get(uri.request_target).status, url}

    consumer_wg.add
    channel.send result
  rescue
    pp failed: url
  ensure
    producer_wg.done
  end
end

spawn do
  # Read from the channel until it's closed
  while result = channel.receive?
    status, url = result
    puts "#{status} #{url}"
    consumer_wg.done
  end
end

# After all requests are completed, we can close the channel, which will allow
# the `receive?` call above to return `nil`
producer_wg.wait
channel.close
consumer_wg.wait

I used 50.milliseconds here because it failed for some but not all. Feel free to experiment with the pattern using whatever timeout does the same for you.

Note that I did have to swap the consumer_wg.add call with the channel.send call. I originally wrote them in the wrong order and I was getting Negative WaitGroup counter exceptions on faillure.

1 Like

The really issue is: how to know when done processing all the items?

I have finally found the post that I asked before and answer by you, almost same question as this. but I still haven’t found a good way to solve this.

Is this the only way to get out of the loop after receiving?

@Karl-Goethebier , you code seem like not work as expected, what i mean is, not
get out of the loop correctly, following code is a Reproducible example:

urls = %w{
  www.python.org
  www.cpan.org
  www.perl.org
  developer.apple.com
  www.sbcl.org
  crystal-lang.org
  www.graalvm.org
}

def producer(url)
  sleep rand(0.2..0.5)
  url
end

chan = Channel(String).new
terminate = Channel(Nil).new
done = Channel(Nil).new

urls.each do |url|
  spawn do
    result = producer(url)
    chan.send result
  end
end

spawn do
  loop do
    select
    when result = chan.receive
      puts result
    when terminate.receive?
      break
    end
  end

  done.close
end

terminate.close
done.receive?

After terminate.close is running. the select when loop exit immediately The reason is that when terminate.receive? has a higher priority.

Does the behavior of Go running correctly?

For the moment i can’t reproduce this. Unfortunately, I don’t quite understand the question yet. My example actually worked without errors - it’s just not optimal.

I thought your example not work, as mine, both of them almost identify, except you run a C function as a lambda instead of i used a method call, use the sleep function to simulate payload.

Even if you happen to be working, it’s because your C function no payload, and complete immediately, so, select when doesn’t have the opportunity to enter when terminate.receive?.

Why should my C - function have no payload? And why should i change my code until it doesn’t work? And there is nothing wrong with the lambda.
Did you try it with compiling the lib:

clang -c -o src/libacme.o src/libacme.c?

Why should my C - function have no payload?

If you C function have payload, result = chan.receive will waiting, when this branch waiting, another branch terminate.receive? will return nil immediately, then loop is break, program exit early.

This is the behavior when running my code, have you tried?

Is this the only way to get out of the loop after receiving?

You said this in the post, but i tried, get out of the loop before receiving.