The Crystal Programming Language Forum

Am I using select properly?

Ah, the mysterious select, undocumented except for “it works like select in go”, although Crystal doesn’t look much like go :slight_smile:

I cooked up this concurrent queue container example, which is supposed to collect data with fill until it is asked for the data with drain_all, at which time it returns an array of all that it has collected. drain returns just one datum. Both should pause the calling fiber until there is at least one datum for them to supply. Channel could do this job by itself, but it doesn’t auto-grow, and you would need select to do multiple receives without blocking. I have some questions:

Am I using select properly?

Does the fiber just magically get killed when it is garbage collected?

Thanks

Bruce
# Concurrent queue container.
# A datum is added to the queue with `#fill`.
# `drain` returns one datum, `drain_all` returns an array (actually a deque)
# of all of the data that have been added since it was last called, or the
# start of this object. `drain` and `#drain_all` will not return until there
# is at least one datum.
class ConcurrentQueue(T)
  @fiber : Fiber
  @fill : Channel(T)
  @drain_all : Channel(Deque(T))
  @drain : Channel(T)

  def initialize
    @fill = Channel(T).new(capacity: 10)
    @drain = Channel(T).new(capacity: 0)
    @drain_all = Channel(Deque(T)).new(capacity: 0)

    @fiber = spawn name: "Queue(#{T.to_s}) fiber." do
      data = Deque(T).new
      data << @fill.receive
      loop do
        select
        when @drain_all.send data
          data = Deque(T).new
          data << @fill.receive
        when @drain.send data[0]
          data.shift
          data << @fill.receive if data.size == 0
        when r = @fill.receive
          data << r
        end
      end
    end
  end

  def fill(i : T)
    @fill.send(i)
  end

  def drain : T
    @drain.receive
  end

  def drain_all : Deque(T)
    @drain_all.receive
  end
end

q = ConcurrentQueue(String).new
q.fill("One")
q.fill("Two")
q.fill("Three")
Fiber.yield # Because the queue is more eager to drain than fill.
p q.drain
p q.drain_all

I’d recommend this article. 5 use cases for Crystal's select statement - lbarasti's blog

Lays out some of the best use cases for select, namely timeout and multiple receives at the same time.

As for the code, you should do something to break out of the loop, otherwise it will go on forever until the main fiber is killed, which will keep it in memory even when it’s not in use. You need to add something to tell the loop “I’m done! Stop running this fiber so I can be garbage collected at some point”. You also may want to guard against Channel::ClosedError when using Channel#send or Channel#receive, as well you should close your channels. a begin, rescue, finally block would do that nicely here.