Ah, the mysterious select
, undocumented except for “it works like select in go”, although Crystal doesn’t look much like go
I cooked up this concurrent queue container example, which is supposed to collect data with fill
until it is asked for the data with drain_all
, at which time it returns an array of all that it has collected. drain
returns just one datum. Both should pause the calling fiber until there is at least one datum for them to supply. Channel
could do this job by itself, but it doesn’t auto-grow, and you would need select
to do multiple receives without blocking. I have some questions:
Am I using select
properly?
Does the fiber just magically get killed when it is garbage collected?
Thanks
Bruce
# Concurrent queue container.
# A datum is added to the queue with `#fill`.
# `drain` returns one datum, `drain_all` returns an array (actually a deque)
# of all of the data that have been added since it was last called, or the
# start of this object. `drain` and `#drain_all` will not return until there
# is at least one datum.
class ConcurrentQueue(T)
@fiber : Fiber
@fill : Channel(T)
@drain_all : Channel(Deque(T))
@drain : Channel(T)
def initialize
@fill = Channel(T).new(capacity: 10)
@drain = Channel(T).new(capacity: 0)
@drain_all = Channel(Deque(T)).new(capacity: 0)
@fiber = spawn name: "Queue(#{T.to_s}) fiber." do
data = Deque(T).new
data << @fill.receive
loop do
select
when @drain_all.send data
data = Deque(T).new
data << @fill.receive
when @drain.send data[0]
data.shift
data << @fill.receive if data.size == 0
when r = @fill.receive
data << r
end
end
end
end
def fill(i : T)
@fill.send(i)
end
def drain : T
@drain.receive
end
def drain_all : Deque(T)
@drain_all.receive
end
end
q = ConcurrentQueue(String).new
q.fill("One")
q.fill("Two")
q.fill("Three")
Fiber.yield # Because the queue is more eager to drain than fill.
p q.drain
p q.drain_all