Hi, I am experimenting with crystal’s fibers and to test with a buffered channel I wrote this:
# generate a list of file names.
# This is the data source.
list = [] of String
(1..4).each do |i|
list << "file#{i}"
end
# provides communication between the producing fiber and the
# consuming one.
ch = Channel(String).new(3)
# fiber1
# this fiber loads the data into the channel
# This is the producer.
spawn do
puts "--- Entering fiber1"
list.each do |val|
puts "fiber1, before send #{val}"
ch.send val
puts "fiber1, after send #{val}"
end
puts "--- Exiting fiber1"
end
# fiber2
# this fiber empties the channel
# This is the consumer.
spawn do
puts "---- Entering fiber 2"
while val2 = ch.receive
puts "Received: #{val2}"
end
puts "---------------------" # The code does not get to here
puts "---- Exiting fiber2"
puts "---------------------"
end
puts "Starting...\n"
# Start the fibers
Fiber.yield
# When the control gets back here all data has been exhausted.
puts "Goodbye!!!"
# shouldn't either of these 2 restart fiber2?
Fiber.yield
ch.close
p ch.closed?
Now, when I run the above, all data is produced and consumed as expected.
However the ‘---- Exiting Fiber 2’ never executed.
This is the output:
$ crystal list-files.cr
Starting...
--- Entering fiber1
fiber1, before send file1
fiber1, after send file1
fiber1, before send file2
fiber1, after send file2
fiber1, before send file3
fiber1, after send file3
fiber1, before send file4
---- Entering fiber 2
Received: file1
Received: file2
Received: file3
Received: file4
fiber1, after send file4
--- Exiting fiber1
Goodbye!!!
true
When your main fiber completes, the program exits regardless of what other fibers are doing.
# shouldn't either of these 2 restart fiber2?
It resumes rather than restarts, but then puts in fiber2 yields the fiber back to either the main fiber (outside of the spawn blocks) or fiber1, whichever was re-enqueued first.
I’ve updated your code to give what I believe is the desired result. If you want to ensure you consume all of the data, you need to consume synchronously (using receive? rather than receive) and only produce in a background fiber rather than spinning up separate fibers for both. It’s also important that the producer fiber is the one that closes the channel when it’s done producing — this is a signal to the consumer that there will be no more data coming beyond what’s already in the channel. I’ve also replaced all usage of puts with Log.debug, which doesn’t yield the fiber.
require "log"
Log.setup :debug
# generate a list of file names.
# This is the data source.
list = [] of String
(1..4).each do |i|
list << "file#{i}"
end
# provides communication between the producing fiber and the
# consuming one.
ch = Channel(String).new(3)
# fiber1
# this fiber loads the data into the channel
# This is the producer.
spawn do
Log.debug { "--- Entering producer" }
list.each do |val|
Log.debug { "producer, before send #{val}" }
ch.send val
Log.debug { "producer, after send #{val}" }
end
Log.debug { "--- Exiting producer" }
ensure
ch.close
end
Log.debug { "Starting...\n" }
Log.debug { "---- Starting consumer" }
while val2 = ch.receive?
Log.debug { "Received: #{val2}" }
end
Log.debug { "---------------------" } # The code does not get to here
Log.debug { "---- Consumer finished" }
Log.debug { "---------------------" }
# When the control gets back here all data has been exhausted.
Log.debug { "Goodbye!!!" }
Alternatively, you can use a WaitGroup to ensure that both the producer and consumer fibers complete before moving on:
require "wait_group"
WaitGroup.wait do |wg|
ch = Channel(String).new(3)
wg.spawn do
# producer
end
wg.spawn do
# consumer
end
end
# By the time you get to here, both producer and consumer will have completed.
With the WaitGroup, it’s important that you still follow the same pattern: only close the channel after you know there will be no more data coming — this can only be known in the producer fiber. If you have multiple producers, you’ll need to wait on a second WaitGroup specifically for the producers before closing the channel:
require "wait_group"
require "log"
Log.setup :debug
WaitGroup.wait do |wg|
ch = Channel(String).new
producer_concurrency = 10
wg.spawn do
# producer
WaitGroup.wait do |producer|
producer_concurrency.times do
producer.spawn do
100.times { |i| ch.send i.to_s }
end
end
end
ensure
# The producer WaitGroup is complete by the time we get here
ch.close
end
wg.spawn do
while value = ch.receive?
Log.debug { value }
end
end
end
Anything that triggers IO (read, write, accept, …), sleeps, and anything that communicates (channel, mutex, …) may switch control to another fiber. It’s only ever a _may_ because it may not have to wait or block and continue immediately.
For example receiving from an empty channel has to wait, so the current fiber will yield, but another fiber sending to the channel won’t yield because there’s a receiver waiting.