Struggling with some code with fibers and channels.
I’ve read :
and a few other ones (e.g Crystal concurrency guide).
What i’m trying to do :
- main (implicit fiber) program spawn 4 fibers :
- First fiber simulates some input (real “stream” will be from TCP, STDIN, …)
- Second one receives messages from the first fiber and store them in some bufffer (e.g. of String)
- Third one displays the content of the buffer every n seconds, say 1.
- Fourth one flush the buffer every n seconds, say 10.
I would like all those fibers to act “almost” independently, and not being blocked by the different messages between them.
If the “simulation” fiber feed the buffer every 100ms, and the “storage” fiber waits messages from the “display” or “flush” fibers every 1 or 10s, the “storage” fiber is actually blocked waiting for the “ticks” of the other fibers.
Hope it’s clear enough.
My best try looks like this for now :
channel_from_sim = Channel(String).new
channel_display = Channel(Bool).new
channel_flush = Channel(Bool).new
# Input simulation
spawn do
tick = 0
loop do
logstr = "proxy log bla #{tick+=1}"
#puts logstr
channel_from_sim.send(logstr)
delay = Random.rand(100)
#if tick % 10 == 0
# puts "Will sleep #{delay}ms..."
#end
sleep delay.milliseconds
end
end
# Display action at other frequency
spawn do
loop do
channel_display.send(true)
sleep 1.seconds
end
end
# Flush action at other frequency
spawn do
loop do
puts Time.local
channel_flush.send(true)
sleep 10.seconds
end
end
# Storage
spawn do
buffer = [] of String
display = false
flush = false
loop do
spawn display = channel_display.receive
spawn flush = channel_flush.receive
buffer << channel_from_sim.receive
if display
puts "SIZE : #{buffer.size}"
display = false
end
if flush
pp buffer
puts "SIZE BEFORE FLUSH: #{buffer.size}"
puts "FLUSH!"
buffer.clear
flush = false
end
end
end
This code :
spawn display = channel_display.receive
spawn flush = channel_flush.receive
it the only way i found to move towards the expected behavior.
Of course, being in the loop, it only gives the feeling of working, because it leaks memory quite fast
So i guess it’s not the right way.
Can you provide any pointer ?
Thanks