Concurrency question (benchmark example)

Hi

I decided to start trying to understand crystal concurrency (in light of the preview_mt feature) and had a look at this: crystal-benchmarks/channel-primes.cr at master · bcardiff/crystal-benchmarks · GitHub

SIZE = ARGV[0].to_i

def generate(chan)
  i = 2
  loop do
    chan.send(i)
    i += 1
  end
end

I understand this. Takes a channel and just writes increasing numbers to it.

def filter(in_chan, out_chan, prime)
  loop do
    i = in_chan.receive
    if i % prime != 0
      out_chan.send(i)
    end
  end
end

I also understand this. Read from one channel and sends what it read out another channel when its divisible by “prime”.

ch = Channel(Int32).new
spawn generate(ch)

Start fiber (or thread) with generator. Got that.

SIZE.times do
  prime = ch.receive
  puts prime
  ch1 = Channel(Int32).new
  spawn filter(ch, ch1, prime)
  ch = ch1
end

Now here I’m completely lost.
SIZE is the number of primes to compute. So, this spawns, let’s say, 15 fibers, who all read on the channel “ch” which is either the generated number or the output of the filter function?

  • Why is this not random? if we have multithreading, why do things still happen in a defined order?

  • The Filter function does this in a loop without end condition. Why does this even work, i.e. ever end?

If I understand correctly (both your code and the implementation of Channel), channels provide some level of order in concurrent code. For example, in 3 different fibers all call receive on the same channel, they will get values in the order in which they called receive.

This happens because the receivers waiting on a channel are held in a queue (implemented as a linked list). When you call receive and there aren’t any values in the channel buffer, the receiving fiber gets pushed into the end of the queue.

So sending to a channel, receiving from a channel, and the values in the channel are all separate FIFO queues. The first sending fiber will succeed in sending the first value to the first receiving fiber every single time.

It looks like once you’ve received SIZE elements out of the channel in generate, the main fiber completes. When the main fiber completes, the entire program exits. For example, the following code will exit despite work still being done in another fiber:

spawn { loop {} }

If all of your application’s work is being done in other fibers, you may need to put the main fiber to sleep indefinitely:

spawn { loop {} }

sleep # Let the other fiber do its work

This happens in multithreaded code, too. If the main thread is done, the entire program exits. This is different from Node.js, for example, where if any setTimeout callbacks are still enqueued, the app will continue until there are none left to execute.

A nice way to explain the sieve example is with the graphical representation of the same algorithm in Go.

https://divan.dev/posts/go_concurrency_visualize/#concurrent-prime-sieve

https://divan.dev/demos/primesieve/

I think you are missing that ch changes on each iteration on the loop (ch = ch1). So the main program will wait to receive the first number that survives the filtering.

1 Like