How to wait a message on multiple channels?

Hi all.

How can I have an Array of Channels with which I want the current Fiber to wait/listen for a message on all those channels at once and exit the flow either if one of the channel received a message or if a timeout (previously defined) occurred
?

I’d look into select - Crystal.

Thanks for your answer, however select seems only appropriate when working with a defined amount of channels, in my case I have a reference to an array of channels which is unknown at compile time.

When working with an array of channels, you may need to spin your own solution to whatever behavior you want to achieve. For example: You may just want to get the messages waiting on the channels, if you care about the order you might do something like:

array_of_channels.map(&.receive?)

but if you care about a timeout you may need to do something like

array_of_channels.each do |channel|
  select
  when item = channel.receive?
    #process item
  when timeout(5.seconds)
    #Handle timeout
  end
end

However, this might still have a behavior you don’t want, it waits up to 5 seconds for each channel. Therefore you may need to do something like:

wg = WaitGroup.new(array_of_channels.size)
array_of_channels.each do |channel|
  spawn do
    begin
      select
      when item = channel.receive?
        #process item
      when timeout(5.seconds)
        #Handle timeout
      end
    end
  ensure
    wg.done
  end
end

wg.wait #Waits for all the channels to finish.

This paradigm only works if you can process the data received from the channel independently, not if you need to process the data all at once. There are a lot of ways to skin a cat on this one.

Let us know more about what you are doing and we can recommend some potential ways to make your code work.

Thanks for your response.
I think none of those solutions will match my use case.

I want to wait for any of the channels to receive a message (I’m using an Array of Channel(Nil)) , hence I want the waiting to be interrupted either as soon as one Channel receives a Nil , either if a timeout is reached while waiting.

Is there a solution for that ?

Does your use case preclude putting all these items into one channel?

I think you want to use the :nodoc: method Channel.non_blocking_select.

See usages in crystal/spec/std/channel_spec.cr at master · crystal-lang/crystal · GitHub

And in crystal/spec/std/channel_spec.cr at master · crystal-lang/crystal · GitHub you can see how to create a timeout_select_action.

Note, is an internal api as we didn’t envision arbitrary amount of channels to be a common idiom when designing the public api.

Does your use case preclude putting all these items into one channel?

@jgaskins Technically no, but design-wise I think so. If I do that I will end up with 5000 to 10000 Fibers receiving a message at the same time, all checking if this message is for them, while only a very few of them are concerned.
I’m afraid that this design could induce a cpu bound solution which I’m actually trying to avoid, considering that I’m currently evaluating Crystal for the first time in a production context to replace a Python gevented solution that becomes a bottleneck.

I think you want to use the :nodoc: method Channel.non_blocking_select .

Thanks for your response @bcardiff . I will have a look to this. Does it imply to implicitly or explicitly spawn one Fiber per channel to receive from ?

Maybe you could tell us a bit more about what you’re actually trying to do?

From the initial description I gathered you’d want one fiber to wait for a signal from either of a number of other fibers. But it seems there is more going on. Hearing about the full context would allow to help you better find a solution for your problem.

1 Like

No.

non_blocking_select is what is used by the select statement. The compiler generates a call to the former from the later. Syntax wise there is no way to have a dynamic number of whens, but the non_blocking_select works on Indexable (so Array can be used)

Well I tried to be as less verbose as possible to prevent loosing people’s time but it seems it’s too late for that.

That being said I realised that I might have another option that I should try before going further in the direction I was headed to. I’ll come back to tell if it was paying of and I might end-up competing two models to see which one is better.