Why does it crash with -Dpreview_mt?

Hi all,

the code below is highly experimental - I’m trying to get to grips with the concepts.

Anyway, it performs not bad, but when I compile it with -Dpreview_mt it crashes. I don’t really have a clue why.

require "wait_group"

module Acme
  struct FPool(IN, OUT)
    @fn : Proc(IN, IN)
    @data : Array(IN)
    @fibers : UInt8

    # getter data                                                                                                 

    def initialize(action, list, workers = 10, amount = 4)
      @data = list
      @results = Array(OUT).new
      @wg = WaitGroup.new
      @in = Channel(IN).new(@data.size)
      @out = Channel(OUT).new(@data.size)
      @aux = Channel(IN).new(@data.size)
      @signal = Channel(Nil).new
      @fibers = workers
      @fn = action
      @wait = amount
    end

    def frob
      # pp! @fibers                                                                                               
      create
      transmit
      gather
      run
      @results.size
    end
    
        
    def results
      @results.sort
    end
        
    private def create
      [*1..@fibers].each do |idx|
        @wg.add
        fiber = spawn name: "\##{idx}" do
          @signal.receive?
          hobo
        ensure
          @wg.done
        end
      end
    end 
    
   private def transmit
      spawn do
        @signal.receive?
        @data.each do |item|
          @in.send item
        end
        @in.close
      end
    end
        
    private def gather
      spawn do
        @signal.receive?
        while result = @out.receive
          @results << result
          # p! @results                                                                                           
        end
      end
    end
    
    private def run
      @signal.close
      @wg.wait
    end 
    
    private def hobo
      while (arg = @in.receive?)
        result = aux?(arg)
        if result
          @out.send(result)
        else
          pp "*** timeout"
        end
        # Fiber.yield                                                                                             
      end
    end
    
    private def aux?(arg) : (IN | Nil)
      spawn do
        @aux.send(@fn.call(arg))
      end
      # Fiber.yield                                                                                               
      select
      when result = @aux.receive?
        result
      when timeout(@wait.seconds)
        nil
      end
    end
  end
end


    
require "./fpool"

fn = ->(n : UInt32) {  n *= 2_u32  }

n = 10_000_000_u32

gizmo = Acme::FPool(UInt32, UInt32)
  .new(action: fn, list: [*1_u32..n], workers: 5_u8)

# pp! gizmo                                                                                      
pp! gizmo.frob
pp! gizmo.results[-1]

This crashes:

crystal build src/runp.cr -Dpreview_mt --no-debug --release

karl@rantanplan:~/src/crystal/futures/fpool$./runp
gizmo.results[-1] # => Unhandled exception in spawn(name: #1): Cannot allocate new fiber stack: Cannot allocate memory (RuntimeError)
  from ./runp in '??'
  from ./runp in '??'
  from ./runp in '??'  from ./runp in '??'
  from ./runp in '??'
  from ???
399968

But this works:

crystal build src/runp.cr --no-debug --release

karl@rantanplan:~/src/crystal/futures/fpool$ time ./runp 

gizmo.frob # => 10000000
gizmo.results[-1] # => 20000000

real    0m3.941s
user    0m3.740s
sys     0m0.200s
1 Like

You’re reaching out of memory (OOM). Running top exhibits the issue: without MT it’s only allocating 550MB of VIRT and 430MB of RSS at most; with MT it’s trying to allocate over 4TB of virtual memory (VIRT) and over 2.3GB or actual memory (RSS). It passes on my computer but I have plentiful of RAM.

The issue is in #aux? that spawns fibers within the #hobo loop: it balloons to create far too many fiber.

Why is only MT impacted? It might be because ST has a single stack pool to recycle the stacks, while MT has one per thread, and you might be unlucky and have the stacks ending up in some stack pools, but fibers being created from a single one (nothing to recycle).

I tested your example with the execution context shard for RFC 2 with a single stack pool per context, and it only allocates 660MB of VIRT and 390MB of RSS. I guess it confirms the issue.

3 Likes

It might be because ST has a single stack pool to recycle the stacks, while MT has one per thread,

for RFC 2 with a single stack pool per context

I have noticed you said:

  • current preview_mt one, a stack pool per thread
  • new RFC2 one, a stack pool per context.

So, does this mean that, the M:N model in go routines achieve on Crystal too?

Thanks.

If you make a lot of Fiber, you will soon run out of memory.
So we have to reduce the number of spawns as much as possible.

As usual, I asked ChatGPT to write some code to work around this by splitting the data into batch sizes. I hope there are no bugs left.

require "wait_group"

class BatchProcessor(IN, OUT)
  @action : Proc(IN, OUT)
  @data : Array(IN)
  @workers : Int32
  @batch_size : Int32
  @results : Channel(Tuple(Int32, Array(OUT)))
  @wg : WaitGroup

  def initialize(action, data, workers = nil)
    @action = action
    @data = data
    @workers = [workers || ENV.fetch("CRYSTAL_WORKERS", "4").to_i, @data.size].min
    @batch_size = (@data.size + @workers - 1) // @workers                                    # / <- Need to avoid Discource s syntax highlighting bug??
    @results = Channel(Tuple(Int32, Array(OUT))).new(@workers)
    @wg = WaitGroup.new
  end

  def process
    spawn_batches
    @wg.wait
    @results.close
    collect_results
  end

  private def spawn_batches
    @data.each_slice(@batch_size).each_with_index do |batch, index|
      @wg.add
      spawn do
        processed_batch = batch.map { |item| @action.call(item) }
        @results.send({index, processed_batch})
        @wg.done
      end
    end
  end

  private def collect_results
    batches = [] of Tuple(Int32, Array(OUT))

    while result = @results.receive?
      batches << result
    end

    batches.sort_by { |batch_index, _| batch_index }
      .flat_map { |_, batch| batch }
  end
end

double_fn = ->(n : UInt32) { n * 2_u32 }
total_items = 100000

processor = BatchProcessor(UInt32, UInt32).new(action: double_fn, data: [*1_u32..total_items])
pp! processor.process

What you want to do may be a little different than this, but I hope it helps.

I had imagined that the fibers in the #hobo loop would immediately be kicked into the can - instead of checking it more closely. The idea of an object pool is thus reduced to absurdity. I had such a bad feeling about #aux? anyway. But I can leave it out - I just need to think of something else for timeouts - the reason I did it that way in the first place. Possibly not my finest hour. Thanks for the great explanation.

Thank you. Sending batches (to a pool) was the next plan. I had tried something similar in Common Lisp a while ago for learning purposes and for fun.
See lparallel and serapeum.

Little addendum:

def batches(l, c)
    s = (l.size/c).ceil.to_i
    l.each_slice(s).to_a
end

list = [*1..13]
p! batches list, 4
karl@rantanplan:~/src/crystal/futures$ ./batches
batches(list, 4) # => [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13]]