Hi all,
the code below is highly experimental - I’m trying to get to grips with the concepts.
Anyway, it performs not bad, but when I compile it with -Dpreview_mt
it crashes. I don’t really have a clue why.
require "wait_group"
module Acme
struct FPool(IN, OUT)
@fn : Proc(IN, IN)
@data : Array(IN)
@fibers : UInt8
# getter data
def initialize(action, list, workers = 10, amount = 4)
@data = list
@results = Array(OUT).new
@wg = WaitGroup.new
@in = Channel(IN).new(@data.size)
@out = Channel(OUT).new(@data.size)
@aux = Channel(IN).new(@data.size)
@signal = Channel(Nil).new
@fibers = workers
@fn = action
@wait = amount
end
def frob
# pp! @fibers
create
transmit
gather
run
@results.size
end
def results
@results.sort
end
private def create
[*1..@fibers].each do |idx|
@wg.add
fiber = spawn name: "\##{idx}" do
@signal.receive?
hobo
ensure
@wg.done
end
end
end
private def transmit
spawn do
@signal.receive?
@data.each do |item|
@in.send item
end
@in.close
end
end
private def gather
spawn do
@signal.receive?
while result = @out.receive
@results << result
# p! @results
end
end
end
private def run
@signal.close
@wg.wait
end
private def hobo
while (arg = @in.receive?)
result = aux?(arg)
if result
@out.send(result)
else
pp "*** timeout"
end
# Fiber.yield
end
end
private def aux?(arg) : (IN | Nil)
spawn do
@aux.send(@fn.call(arg))
end
# Fiber.yield
select
when result = @aux.receive?
result
when timeout(@wait.seconds)
nil
end
end
end
end
require "./fpool"
fn = ->(n : UInt32) { n *= 2_u32 }
n = 10_000_000_u32
gizmo = Acme::FPool(UInt32, UInt32)
.new(action: fn, list: [*1_u32..n], workers: 5_u8)
# pp! gizmo
pp! gizmo.frob
pp! gizmo.results[-1]
This crashes:
crystal build src/runp.cr -Dpreview_mt --no-debug --release
karl@rantanplan:~/src/crystal/futures/fpool$./runp
gizmo.results[-1] # => Unhandled exception in spawn(name: #1): Cannot allocate new fiber stack: Cannot allocate memory (RuntimeError)
from ./runp in '??'
from ./runp in '??'
from ./runp in '??' from ./runp in '??'
from ./runp in '??'
from ???
399968
But this works:
crystal build src/runp.cr --no-debug --release
karl@rantanplan:~/src/crystal/futures/fpool$ time ./runp
gizmo.frob # => 10000000
gizmo.results[-1] # => 20000000
real 0m3.941s
user 0m3.740s
sys 0m0.200s