Select with an arbitrary collection of channels and a timeout

How can I do:

select 
when x=channels[0].receive
  handle_receipt 0, x
when x=channels[1].receive
  handle_receipt 1,x
when timeout(30.seconds)
  handle_timeout
end

…but with the size of the array of channels dynamically determined at runtime?

Looking in src/channels.cr I think there’s a way I could create an array of SelectAction objects, and pass those to Channel.non_blocking_select(), but it’s not clear exactly how I’d do that.

Does anyone have an example of doing this that I could cheat from?

Paul

1 Like

It’s better to use the select statement rather than the Channel.non_blocking_select.
If there is an upper limit to channels that you want to use, then you should be able to use an additional channel to feel the space. Something similar to what was proposed in https://github.com/crystal-lang/crystal/issues/8677#issuecomment-574313416

If you want to still use Channel.non_blocking_select, the underlying call is

i, m = Channel.non_blocking_select(
  ch1.receive_select_action, 
  ch2.receive_select_action, 
  timeout_select_action(30.seconds)
)

or

select_actions = [ch1.receive_select_action, 
  ch2.receive_select_action, 
  timeout_select_action(30.seconds)]
# or 
select_actions = channels.map(&.receive_select_action) + [timeout_select_action(3.seconds)]

i, m = Channel.non_blocking_select(select_actions)

https://gist.github.com/bcardiff/289953a80eb3a0512a2a2f8c8dfeb1db and https://github.com/crystal-lang/crystal/pull/8506 should give you the background information.

1 Like

Thank you, that is very helpful!

Since I could have hundreds of channels, I don’t think that enumerating them in separate when statements is a good idea. I thought of using a macro, but it still seems very awkward.

I know the Channel API isn’t documented (and therefore could change) but that’s a risk I don’t mind taking for this project.

Thanks again!

Paul

Maybe you shouldn’t have that many channels? Note it’s safe to share a single channel to multiple fibers for sending and receiving from.

Good point. I’ll see if I can change my design to use a simpler approach. When I was first thinking through it, it seemed pretty difficult (I thought I would need to lock/unlock the state data for each “job” and either re-queue or otherwise deal with the lack of concurrency since I can’t run two jobs using the same state data simultaneously. By using a channel for each set of state data, I thought I’d solved that problem…)

Thank you,

Paul

Thinking about it, I can track in the primary job publisher whether a job is in flight for a given set of state data, and only send jobs to the channel that are eligible to run immediately. No locking needed, no unnecessarily idle fibers.

Thanks again!

Paul

It might help if you describe what are you trying to achieve, instead of asking how do to the solution you have in mind.

Ahh, I apologize for not being clear, and I appreciate your reminder that it’s important to give as much context as possible with a question.

I’m not trying to achieve any particular goal in a way that is “best” by any measure, I’m trying to learn Crystal and understand how Channels work. My goal was already achieved in another language, before I started trying to do it in Crystal, so I don’t even need to complete this. I just want to learn. :slight_smile:

Thanks to everyone for your help, I have learned what I needed to know.

Paul