How to send http requests with Channels?

Hi guys!

I`m trying to make 5 requests at once and here is my code:

require "http/client"

channel = Channel(String).new
list = %w[
  https://www.akitaonrails.com/
  https://citizen428.net/
  https://crystal-lang.org/
  https://www.akitaonrails.com/
  https://citizen428.net/
  https://crystal-lang.org/
]

list.each do |url|
  spawn do
    response = HTTP::Client.get(url)
    channel.send response.body
  end
end

puts channel.receive

Instead I have only 1 response)

The gist of it is you only get a single response because you only ever call channel.receive once. If you want it to print all of them, you need to call it the same amount of times as there were requests. E.g.

list.size.times do
  puts channel.receive
end
3 Likes

Y a my best friend ^_^
Thanks!)))))

Hi, @Blacksmoke16 , instead of use list.size.times ..., is there a way to use loop or while instead?

You could do it a number of ways yes, but the gist of it is you need to do it in a way where you call receive the same amount of times as you spawned fibers. Otherwise if you do it less you won’t get all the data back, but if you do it more, it’ll block waiting for data that’ll never come.

Yes, i understood what you means, but my concern is another issue, following is a example.

ch = Channel(Int32).new

[1, 2, 3].each do |i|
  spawn do # => NOTICE: spawn 3 fiber here.
    ch.send i
  end
end

# We have to encode a 3.times or ???.size.times.here
3.times do |i|
  p ch.receive
end

Above code work well, but, instead use 3.times, i want to use loop/where instead.

ch = Channel(Int32).new

[1, 2, 3].each do |i|
  spawn do # => NOTICE: spawn 3 fiber here.
    ch.send i
  end
end

loop do
  p ch.receive?
end

But this code not work, ch.receive? get blocked because ch was never closed.

So, i ask for help if there is a way to solve this issue more elegantly.

BTW: i don’t want use 3.times or [1,2,3].size.times, because sometimes, the size is not easy to know.

EDIT:

Following is a solution, but i guess there must be a more robust way to do this.

loop do
  break if ch.closed?

  select
  when value = ch.receive?
    p value
  when timeout 3.seconds
    puts "Timeout"
    ch.close
  end
end

You would have to keep track of how many iterations you went thru and break out of the loop once you reached the amount of fibers you spawned. I.e. essentially what .size.times does.

Is there a reason using that isn’t sufficient for your use case? If you’re spawning a known amount of fibers then thats the easiest way to handle it. Otherwise you have to get into more robust concurrency logic using select. 5 use cases for Crystal's select statement - lbarasti's blog has some good examples of how you can use it. The first example there might be a good solution if you are wanting to have some work done that spawns an arbitrary amount of fibers. Could have it send to a dedicated channel to know when its done and when it should break out of the loop.

Related: [RFC] Structured Concurrency · Issue #6468 · crystal-lang/crystal · GitHub

1 Like

Hi, this solution seems to work only under certain conditions.

For simplicity, see the following example:

following code always work

def prime?(n)
  (2...n).all? { |i| n % i != 0 }
end

def primes(n)
  (3..n).select { |x| prime?(x) }
end

ch = Channel(Int32).new
terminate = Channel(Nil).new
done = Channel(Nil).new

[1, 2, 3, 4].each do |i|
  spawn do
    # use primes simulate high CPU task
    primes(rand(9000..10000))
    # sleep 1
    ch.send i
  end
end

spawn do
  loop do
    select
    when value = ch.receive
      p value
    when terminate.receive?
      break
    end
  end
  done.close
end

terminate.close
done.receive?

But, if uncomment the sleep 1, will cause when terminate.receive? been selected first, code exit early without any output.

Any thoughts?

@Blacksmoke16 , i test on one of my recent created shards, it seem like not works as expected too, please check.

I try to pull the original author here too for discussing.

@lbarasti

I’m pretty sure it’s expected. Remember fibers do not immediately execute when they’re spawned. So in this example when you have the sleep 1, you spawn the 4 prime fibers, and another fiber with the loop + select. The terminate channel is then closed. Finally you call receive? on the done channel which blocks the main fiber, execution switches to one of the prime fibers, hits the sleep, execution switches to the loop + select fiber, no value was sent to the ch channel due to the sleep, the terminate channel was already closed so that part of the select executes, breaks out of the loop, closes the done fiber which causes your original done.receive? to return nil, then the main fiber exits without doing anything else.

1 Like

Yes, But, this case seems like inevitable? i add sleep 1 just for simulate it.

as you see in my previous comment

Because HTTP::Client.get url, is a IO operation, it will yield the time to others fiber, even, i never add a sleep manually.

So, is there possible to fix my code to give expected behavior?