Fibers, channels and blocking

Struggling with some code with fibers and channels.
I’ve read :

and a few other ones (e.g Crystal concurrency guide).

What i’m trying to do :

  • main (implicit fiber) program spawn 4 fibers :
  1. First fiber simulates some input (real “stream” will be from TCP, STDIN, …)
  2. Second one receives messages from the first fiber and store them in some bufffer (e.g. of String)
  3. Third one displays the content of the buffer every n seconds, say 1.
  4. Fourth one flush the buffer every n seconds, say 10.

I would like all those fibers to act “almost” independently, and not being blocked by the different messages between them.
If the “simulation” fiber feed the buffer every 100ms, and the “storage” fiber waits messages from the “display” or “flush” fibers every 1 or 10s, the “storage” fiber is actually blocked waiting for the “ticks” of the other fibers.
Hope it’s clear enough.
My best try looks like this for now :

channel_from_sim = Channel(String).new
channel_display = Channel(Bool).new
channel_flush = Channel(Bool).new

# Input simulation
spawn do
        tick = 0
        loop do
                logstr = "proxy log bla #{tick+=1}"
                #puts logstr
                channel_from_sim.send(logstr)
                delay = Random.rand(100)
                #if tick % 10 == 0
                #       puts "Will sleep #{delay}ms..."
                #end
                sleep delay.milliseconds
        end
end

# Display action at other frequency
spawn do
        loop do
                channel_display.send(true)
                sleep 1.seconds
        end
end

# Flush action at other frequency
spawn do
        loop do
                puts Time.local
                channel_flush.send(true)
                sleep 10.seconds
        end
end

# Storage
spawn do
        buffer = [] of String
        display = false
        flush = false
        loop do
                spawn display = channel_display.receive
                spawn flush = channel_flush.receive
                buffer << channel_from_sim.receive
                if display
                        puts "SIZE : #{buffer.size}"
                        display = false
                end
                if flush
                        pp buffer
                        puts "SIZE BEFORE FLUSH: #{buffer.size}"
                        puts "FLUSH!"
                        buffer.clear
                        flush = false
                end
        end
end

This code :
spawn display = channel_display.receive
spawn flush = channel_flush.receive
it the only way i found to move towards the expected behavior.

Of course, being in the loop, it only gives the feeling of working, because it leaks memory quite fast :frowning:

So i guess it’s not the right way.
Can you provide any pointer ?
Thanks

I don’t know how to answer your question directly but I found this series very informative on concurrency in Crystal and how to test it.

Also here is a good overview without an implementation on Concurrency in Crystal.

I think you are missing knowledge of the select keyword which, by the way, isn’t documented anywhere. But it’s similar to how it works in Go.

Here’s the Storage block you can use:

# Storage
spawn do
  buffer = [] of String
  loop do
    select
    when channel_display.receive
      puts "SIZE : #{buffer.size}"
    when channel_flush.receive
      pp buffer
      puts "SIZE BEFORE FLUSH: #{buffer.size}"
      puts "FLUSH!"
      buffer.clear
      flush = false
    when elem = channel_from_sim.receive
      buffer << elem
    end
  end
end

Make sure to put a sleep at the end of the program. Here’s the complete program:

channel_from_sim = Channel(String).new
channel_display = Channel(Bool).new
channel_flush = Channel(Bool).new

# Input simulation
spawn do
  tick = 0
  loop do
    logstr = "proxy log bla #{tick += 1}"
    # puts logstr
    channel_from_sim.send(logstr)
    delay = Random.rand(100)
    # if tick % 10 == 0
    #       puts "Will sleep #{delay}ms..."
    # end
    sleep delay.milliseconds
  end
end

# Display action at other frequency
spawn do
  loop do
    channel_display.send(true)
    sleep 1.seconds
  end
end

# Flush action at other frequency
spawn do
  loop do
    puts Time.local
    channel_flush.send(true)
    sleep 10.seconds
  end
end

# Storage
spawn do
  buffer = [] of String
  loop do
    select
    when channel_display.receive
      puts "SIZE : #{buffer.size}"
    when channel_flush.receive
      pp buffer
      puts "SIZE BEFORE FLUSH: #{buffer.size}"
      puts "FLUSH!"
      buffer.clear
      flush = false
    when elem = channel_from_sim.receive
      buffer << elem
    end
  end
end

sleep
1 Like

And this is suddenly so much more elegant :slight_smile:
I found this great post, too : https://lbarasti.com/post/select_statement/ , from the same person who did the videos.
Easier to find with the right keyword ;)
Thank you all !

1 Like