First steps with spawn | -Dpreview_mt behaviour

Hi all ! I’m trying to understand how spawn works, with and without -Dpreview_mt
So, my exercice consists of counting the lines of a text file, using multiple spawns in a “rat race” fashion with one of them reaching EOF before the others.

wc -l words.txt
466550 words.txt
nb_thread = 8

channel = Channel(Int32).new(nb_thread)
mutex = Mutex.new # (:unchecked)
eof_reached = Atomic.new(0)
file = "words.txt"

File.open(file) do |file|
  nb_thread.times do |nb_num|
    spawn do
      count = 0
      while eof_reached.get == 0
        begin
          mutex.lock
          line = file.read_line
          mutex.unlock
          count += 1
          Fiber.yield
        rescue ex : IO::EOFError
          eof_reached.set(1)
          mutex.unlock
        end
      end
      channel.send(count)
    end # spawn
  end   # times

  total = 0
  nb_thread.times do
    recv = channel.receive
    puts recv
    total += recv
  end
  puts total
end # open

So, without -Dpreview_mt I get

58318
58318
58319
58319
58319
58319
58319
58319
466550

Result is OK.
Now if I comment (or not) the

Fiber.yield

And run the same code with -Dpreview_mt
I get

59158
60351
60162
57913
58523
55017
59871
55579
**466574**

Furthermore, the result changes everytime I run the code

count is local to the “thread”,
line = file.read_line is protected by a Mutex,
eof_reached is an Atomic
so what am I missing ?

Edit :
When leaving Fiber.yield in place
The result is sometimes ok, often wrong, but better than without

I get results like

466550
466553
466554

Edit 2
This is wrong
channel = Channel(Int32).new(nb_thread)
Replaced with
channel = Channel(Int32).new

Results are better, but still +1 or 2 or 3 more than expected

What were you expecting?

As long as all fibers are in a single thread, there’s not much variance because they’re all running at the same pace, managed by Crystal’s scheduler.
-Dpreview_mt spawns multiple threads and distributes fibers across them. But how much time each thread gets to run is managed by the operating system. They’re OS threads. So some may get more iterations, others less in the same amount of time.

If I hazard some guessing, with my limited knowledge of fibers and threads:

As I understand it, fibers is cooperative multi-tasking, so you always only have one fiber running, and switching fibers at well-defined places. Those places would be Fiber.yield and in readline (as it’s an IO operation). So, when a fiber gets scheduled after the yield, it has a free run until the read_line where no other fiber can gain control, and then the mutex has locked.

Threads on the other hand, is parallel, and while I can’t pinpoint where the ghost lines comes from, I’d wager it’s caused different threads hitting the shared variables at just the right offset to misbehave.

What I expected is

rescue ex : IO::EOFError
eof_reached.set(1)

To be executed on one thread and allowing all threads to terminate…

When doing some Logs I can see that EOF is reached on each and every thread, whilst it should not because of the

eof_reached.set(1)

There’s something about

file.read_line

That I don’t get in a “parallel” running

Ok, so

  • the mutex is locked
  • read_line raises because of EOF
  • the rescue executes, eof_reached is set, the mutex is unlocked (that’s needed because it’s still locked)
  • so other theads that were waiting are now able to read_line
  • this will also raises because now were are at EOF

I’m still lost in the tracking but… That’s a first step

Between when you check that the atomic is still zero and when you hit the mutex, other threads can run. Then other threads can also run after the exception was caught since there’s nothing in the algorithm making it exclusive to one thread.

Basically, your algorithm isn’t thread safe ^_^;

1 Like

But what I don’t get is how the total can be different. Even if multiple threads do the read_line after EOF has been read, it should raise and thus not increment the counter.

That’s a hint ! Thanks !

I don’t get it either, but there’s surely a good reason… But which ? I’ll try to investigate later because there’s something to understand here

Another thought, taking your insight I might have a diff count of 1 by number of threads, because on one iteration the thread will raise but between the raise and its handling, other processes have one turn to count += 1

So I added
count += 1 unless eof_reached.get == 1

And I’m still having a diff going up to 14

Puzzling… parallelism is hard ;-)

Ok, it’s getting on my nerves ;-)

You can say that until EOF is reached, the whole code is ok.
file.read_line is ensured not to be interrupted thanks to the mutex.lock/unlock

Now, let’s say we have one thread about to win the mutex and have an EOF just after
We call this thread TW, and Tn all the others

TW                                     ; Tn
mutex.lock                             ;
TW continue                            ; Tn on hold on mutex.lock
line = file.read_line                  ; Tn on hold
EOF is raised                          ; count += 1 not reached | Tn on hold
rescue ex : IO::EOFError               ; Tn on hold
eof_reached.set(1)                     ; Tn on hold
mutex.unlock                           ; Tn pursues the same path as on left column
jump to loop while eof_reached.get     ; line = file.read_line
leave loop                             ; EOF is raised same process as left column and count += 1 not reached

So I don’t see how this code would not be thread safe

Indeed. That’s why I’m staying away from threading and sticking with fibers as long as I can. Fibers has gotchas too, but the more well-defined serial nature makes it much easier to think about. So as long as I don’t have a need for the performance of true parallelism, it’s all good. Helps when you’re used languages that haven’t finished bootstrapping when Crystal has done all the work. :grin:

@aboulafia There’s indeed nothing wrong with your code. It executes just as expected on X86.

Is your CPU an ARM or AArch64 architecture, like a Apple M1 or Raspberry Pi? Then the problem is that Mutex is broken on these because we improperly use atomics (it just happens to work on X86). @jgaskins has a pull request in review with a quick fix, and I started working to get these fixed in the best balance of correct & fast.

3 Likes

Yes ! My cpu is a M2

Glad to know because I didn’t understand what’s going on… Thanks !

Edit :

I should have searched in the Github issues, because this works fine and I don’t get wrong results anymore

class Crystal::SpinLock
  def lock
    previous_def
    ::Atomic::Ops.fence :sequentially_consistent, false
  end

  def unlock
    ::Atomic::Ops.fence :sequentially_consistent, false
    previous_def
  end
end

mutex = Crystal::SpinLock.new

Crystal::SpinLock is actually fine (it was fixed a while ago), and you mustn’t use it since it will block the current thread (and all threads waiting on the lock) which will prevent other fibers from running.

The problem is in Mutex#unlock: it should call @state.set(0) instead of @state.lazy_set(0) or at least set a barrier. So this monkey-patch should work:

class Mutex
  def unlock
    ::Atomic::Ops.fence :sequentially_consistent, false
    previous_def
  end
end
2 Likes

That’s true that on relatively small text files I see often one thread doing 100% of the job and the others 0%

On far bigger files, the dispatching is better but not even amongst threads (still some does few %)

I’ll try your solution

Thanks !

TL;DR: Set CRYSTAL_WORKERS=8 (or however many CPU cores you have) in your shell environment before running your program and make sure you are consecutively spawning all of the fibers doing this work, with no other spawn calls between them.


There could be a couple explanations for this. The lowest-hanging fruit is setting the CRYSTAL_WORKERS environment variable to set the number of threads to use — the default number of worker threads is 4. Set this when running the program (its value at compile time is not important):

CRYSTAL_WORKERS=8 bin/my_program

The other possible explanation for it not being even is that, depending on the pattern used in spinning up fibers in your program, you may actually end up with an uneven distribution. Fibers are assigned to threads in a round-robin fashion, so fibers spawned at arbitrary times or interleaved with other spawn calls may not be evenly distributed.

For example, if you have 8 CPU cores, and you run this code, the fibers doing the work won’t be evenly distributed across all of those cores:

8.times do
  spawn do_some_work
  spawn track_the_work_being_done
end

The reason for this is that the do_some_work fiber will be assigned to threads 1, 3, 5, and 7 while track_the_work_being_done will be assigned to threads 2, 4, 6, and 8. This may not always be true (and in fact it may not be true anymore,) but that’s how it worked last time I read through the scheduler code.

Looking at the code you posted above, you seem to be doing it in a way that will spread the work across threads, but it’s worth mentioning just in case.

1 Like

Yep I used this ENV var in all and every tests I did as
crystal build t1.cr --release -Dpreview_mt
then
CRYSTAL_WORKERS=8 ; time ./t1

Still all lot to investigate tho

Still experimenting with -Dpreview_mt

Let say I’ve a main thread spawning threads on demand
How the main thread can know if all the threads have finished ?

spawn do
  worker(channel, mutex, global_data, Slice.new(prev_pos, p8 - prev_pos))
end

I presume there must be a kind of solution like Go’s WaitGroup
I looked at the doc, and also Parallelism in Crystal - The Crystal Programming Language

Edit : spawning on demand is not the best strategy, better have a fixed set of workers fed by channels. But still my question remains