@[Link("curl")]
@[Link(ldflags: "#{__DIR__}/libacme.o")]
lib LibAcme
fun curl = ripp(url : LibC::Char*) : LibC::Long
end
Urls = %w{
www.python.org
www.cpan.org
www.perl.org
developer.apple.com
www.sbcl.org
crystal-lang.org
www.graalvm.org
}
fn = ->(url : String) {
LibAcme.curl "https://#{url}"
}
chanl = Channel(Tuple(Int64, String)).new
terminate = Channel(Nil).new
done = Channel(Nil).new
Urls.each do |url|
gizmo = spawn do
result = {(fn.call(url)), url}
chanl.send result
end
end
spawn do
loop do
select
when result = chanl.receive
puts "#{result[0]} #{result[1]}"
when terminate.receive?
break
end
end
done.close
end
terminate.close
done.receive?
Is this the only way to get out of the loop after receiving?
When the number of messages in the channel is unknown - in this case it is known and i could say Urls.size.times but that isn’t really realistic.
And to be honest: I did’t fully understand the last done.receive? until know.
Since you need to wait for a set of producer fibers and a consumer fiber to complete, I would use WaitGroup for this.
In the example below, I set up a WaitGroup for the producer fibers and the consumer fiber. The main fiber increments the producer wait group and the producer fibers increment the consumer wait group.
Then after all that’s been setup, we wait for all of the producers to complete with producer_wg.wait, then close the result channel. It’s important that the producers’ results are already in the channel by then.
require "http"
require "wait_group"
urls = %w{
www.python.org
www.cpan.org
www.perl.org
developer.apple.com
www.sbcl.org
crystal-lang.org
www.graalvm.org
}
channel = Channel(Tuple(HTTP::Status, String)).new
producer_wg = WaitGroup.new
consumer_wg = WaitGroup.new
urls.each do |url|
producer_wg.add
spawn do
result = {HTTP::Client.get("https://#{url}").status, url}
channel.send result
consumer_wg.add
ensure
producer_wg.done
end
end
spawn do
# Read from the channel until it's closed
while result = channel.receive?
status, url = result
puts "#{status} #{url}"
consumer_wg.done
end
end
# After all requests are completed, we can close the channel, which will allow the
producer_wg.wait
channel.close
consumer_wg.wait
Notes:
Since the channel does not have a buffer size greater than the number of URLs being fetched concurrently, you must consume the channel while the producers are still running. That’s why we run it in a fiber before waiting on producer_wg. Otherwise the operation will deadlock.
This channel doesn’t have a buffer at all so every send will block until receive is called in another fiber, but it would be true if it had a buffer smaller than urls.size.
If urls.size is unknown (for example, if you’re processing results from a stream), this pattern might cut down on memory consumption.
I’ve never used libacme before, so since your function returns a LibC::Long I replaced it with HTTP::Client.get and returned the status so it would work on my machine. Feel free to update the HTTP::Client.get(url).status expression back to your LibAcme.curl(url) expression.
In Crystal 1.14, WaitGroup will be getting a friendlier API, but for now we need to call wg.done manually in an ensure block.
When the number of messages in the channel is unknown - in this case it is known and i could say Urls.size.times but that isn’t really realistic.
I asked this question before in the forum(although can’t found it).
Current given solution is:
spawn do
# Read from the channel until it's closed
while result = channel.receive?
...
end
end
But, if the original code use select with timeout, how to solve this? i consider this is not a equivalent replace for original code, we still need a way to manually or automatically count the original number, or, a way to break out of the loop automatically use WaitGroup instead of result = channel.receive? ?
An example, how to replace this (select + timeout) use above while pattern?
Prob could have the select + timeout in a loop do then use another channel to send a value to once you’re done processing all the items that would break out of the loop?
I’m not sure I’d use select / timeout with HTTP calls. We can use HTTP::Client#read_timeout for the same purpose:
require "http"
require "wait_group"
urls = %w{
www.python.org
www.cpan.org
www.perl.org
developer.apple.com
www.sbcl.org
crystal-lang.org
www.graalvm.org
}
channel = Channel(Tuple(HTTP::Status, String)).new
producer_wg = WaitGroup.new
consumer_wg = WaitGroup.new
urls.each do |url|
producer_wg.add
spawn do
uri = URI.parse("https://#{url}")
http = HTTP::Client.new(uri)
http.read_timeout = 50.milliseconds
result = {http.get(uri.request_target).status, url}
consumer_wg.add
channel.send result
rescue
pp failed: url
ensure
producer_wg.done
end
end
spawn do
# Read from the channel until it's closed
while result = channel.receive?
status, url = result
puts "#{status} #{url}"
consumer_wg.done
end
end
# After all requests are completed, we can close the channel, which will allow
# the `receive?` call above to return `nil`
producer_wg.wait
channel.close
consumer_wg.wait
I used 50.milliseconds here because it failed for some but not all. Feel free to experiment with the pattern using whatever timeout does the same for you.
Note that I did have to swap the consumer_wg.add call with the channel.send call. I originally wrote them in the wrong order and I was getting Negative WaitGroup counter exceptions on faillure.
The really issue is: how to know when done processing all the items?
I have finally found the post that I asked before and answer by you, almost same question as this. but I still haven’t found a good way to solve this.
Is this the only way to get out of the loop after receiving?
@Karl-Goethebier , you code seem like not work as expected, what i mean is, not
get out of the loop correctly, following code is a Reproducible example:
urls = %w{
www.python.org
www.cpan.org
www.perl.org
developer.apple.com
www.sbcl.org
crystal-lang.org
www.graalvm.org
}
def producer(url)
sleep rand(0.2..0.5)
url
end
chan = Channel(String).new
terminate = Channel(Nil).new
done = Channel(Nil).new
urls.each do |url|
spawn do
result = producer(url)
chan.send result
end
end
spawn do
loop do
select
when result = chan.receive
puts result
when terminate.receive?
break
end
end
done.close
end
terminate.close
done.receive?
After terminate.close is running. the select when loop exit immediately The reason is that when terminate.receive? has a higher priority.
For the moment i can’t reproduce this. Unfortunately, I don’t quite understand the question yet. My example actually worked without errors - it’s just not optimal.
I thought your example not work, as mine, both of them almost identify, except you run a C function as a lambda instead of i used a method call, use the sleep function to simulate payload.
Even if you happen to be working, it’s because your C function no payload, and complete immediately, so, select when doesn’t have the opportunity to enter when terminate.receive?.
Why should my C - function have no payload? And why should i change my code until it doesn’t work? And there is nothing wrong with the lambda.
Did you try it with compiling the lib:
If you C function have payload, result = chan.receive will waiting, when this branch waiting, another branch terminate.receive? will return nil immediately, then loop is break, program exit early.
This is the behavior when running my code, have you tried?
Is this the only way to get out of the loop after receiving?
You said this in the post, but i tried, get out of the loop before receiving.