Second fiber does not complete

Hi, I am experimenting with crystal’s fibers and to test with a buffered channel I wrote this:

# generate a list of file names.
# This is the data source.
list = [] of String
(1..4).each do |i|
  list << "file#{i}"
end

# provides communication between the producing fiber and the
# consuming one.
ch = Channel(String).new(3)

# fiber1
# this fiber loads the data into the channel
# This is the producer.
spawn do
  puts "--- Entering fiber1"
  list.each do |val|
    puts "fiber1, before send #{val}"
    ch.send val
    puts "fiber1, after send #{val}"
  end
  puts "--- Exiting fiber1"
end

# fiber2
# this fiber empties the channel
# This is the consumer.
spawn do
  puts "---- Entering fiber 2"
  while val2 = ch.receive
    puts "Received: #{val2}"
  end
  puts "---------------------"   # The code does not get to here
  puts "---- Exiting fiber2"
  puts "---------------------"
end

puts "Starting...\n"

# Start the fibers
Fiber.yield

# When the control gets back here all data has been exhausted.
puts "Goodbye!!!"

# shouldn't either of these 2 restart fiber2?
Fiber.yield
ch.close 
p ch.closed?


Now, when I run the above, all data is produced and consumed as expected.
However the ‘---- Exiting Fiber 2’ never executed.

This is the output:

$ crystal list-files.cr
Starting...
--- Entering fiber1
fiber1, before send file1
fiber1, after send file1
fiber1, before send file2
fiber1, after send file2
fiber1, before send file3
fiber1, after send file3
fiber1, before send file4
---- Entering fiber 2
Received: file1
Received: file2
Received: file3
Received: file4
fiber1, after send file4
--- Exiting fiber1
Goodbye!!!
true

Any pointer would be most welcome.
Many thanks.

When your main fiber completes, the program exits regardless of what other fibers are doing.

# shouldn't either of these 2 restart fiber2?

It resumes rather than restarts, but then puts in fiber2 yields the fiber back to either the main fiber (outside of the spawn blocks) or fiber1, whichever was re-enqueued first.


I’ve updated your code to give what I believe is the desired result. If you want to ensure you consume all of the data, you need to consume synchronously (using receive? rather than receive) and only produce in a background fiber rather than spinning up separate fibers for both. It’s also important that the producer fiber is the one that closes the channel when it’s done producing — this is a signal to the consumer that there will be no more data coming beyond what’s already in the channel. I’ve also replaced all usage of puts with Log.debug, which doesn’t yield the fiber.

require "log"

Log.setup :debug

# generate a list of file names.
# This is the data source.
list = [] of String
(1..4).each do |i|
  list << "file#{i}"
end

# provides communication between the producing fiber and the
# consuming one.
ch = Channel(String).new(3)

# fiber1
# this fiber loads the data into the channel
# This is the producer.
spawn do
  Log.debug { "--- Entering producer" }
  list.each do |val|
    Log.debug { "producer, before send #{val}" }
    ch.send val
    Log.debug { "producer, after send #{val}" }
  end
  Log.debug { "--- Exiting producer" }
ensure
  ch.close
end

Log.debug { "Starting...\n" }

Log.debug { "---- Starting consumer" }
while val2 = ch.receive?
  Log.debug { "Received: #{val2}" }
end
Log.debug { "---------------------" } # The code does not get to here
Log.debug { "---- Consumer finished" }
Log.debug { "---------------------" }

# When the control gets back here all data has been exhausted.
Log.debug { "Goodbye!!!" }

Alternatively, you can use a WaitGroup to ensure that both the producer and consumer fibers complete before moving on:

require "wait_group"

WaitGroup.wait do |wg|
  ch = Channel(String).new(3)

  wg.spawn do
    # producer
  end

  wg.spawn do
    # consumer
  end
end
# By the time you get to here, both producer and consumer will have completed.

With the WaitGroup, it’s important that you still follow the same pattern: only close the channel after you know there will be no more data coming — this can only be known in the producer fiber. If you have multiple producers, you’ll need to wait on a second WaitGroup specifically for the producers before closing the channel:

require "wait_group"
require "log"

Log.setup :debug

WaitGroup.wait do |wg|
  ch = Channel(String).new
  producer_concurrency = 10

  wg.spawn do
    # producer
    WaitGroup.wait do |producer|
      producer_concurrency.times do
        producer.spawn do
          100.times { |i| ch.send i.to_s }
        end
      end
    end
  ensure
    # The producer WaitGroup is complete by the time we get here
    ch.close
  end

  wg.spawn do
    while value = ch.receive?
      Log.debug { value }
    end
  end
end
2 Likes

Many thanks for such a prompt and thorough answer.

You have provided a lot of information and explanation in there.
This is very much appreciated.

I’m off to play with that code now :slight_smile:

Thanks again.

1 Like

Thanks also for pointing this out:

I’ve also replaced all usage of puts with Log.debug, which doesn’t yield the fiber.

How does one know how a method behave in a fiber (yield / does not yield)?

Anything that triggers IO (read, write, accept, …), sleeps, and anything that communicates (channel, mutex, …) may switch control to another fiber. It’s only ever a _may_ because it may not have to wait or block and continue immediately.

For example receiving from an empty channel has to wait, so the current fiber will yield, but another fiber sending to the channel won’t yield because there’s a receiver waiting.

1 Like

Thank you.