The Crystal Programming Language Forum

MT and Mutex

I’ve been playing around with MT and I have to say it was not a relaxed ride, deadlocks, shared objects misbehaving, and the worst thing which is strange behavior of the schedular queue.

I might be missing a few “understandings” about MT that might be the root cause of this bad behavior.

Mutex.synchronize will that stop the world or just promise that a single Fiber will be handling the blocked code at a single time?

What happened if I synchronize inside another synchronize ?

Very few parts of the std-lib is thread safe. For now the core aspect for communication have been worked: channels, select, scheduler.

Any shared data would need to be synchronized manually.

I’m not sure if the issues you present are from the std-lib or the scheduler itself.

Mutex#synchronize will not stop the world. But ensure there is only one fiber at a time in it. In 0.31.1 it is reentrant so synchronize inside synchronize will pass (and will need to be unlocked twice also). In master there are some current changes/discussions whether it should or not remain reentrant or not. In master currently, is not. https://github.com/crystal-lang/crystal/pull/8295

Is there some more insight to narrow down the uphills of your journey?

2 Likes

Well, I think that there are a few gotchas that I fell for
I used this pattern quite allot before

done = 0
[Object1, Object2, Object3].each do |runnable|
  spawn do
    runnable.run
    done += 1
  end
end

while done < [Object1, Object2, Object3].size
  sleep 1
end

I used this to control and manage multiple Fibers running.

Now, it seems like it should still work, but there are a few issues now.
done is not MT safe now, the code should now be changed to

lock = Mutex.new
done = 0
[Object1, Object2, Object3].each do |runnable|
  spawn do
    runnable.run
    lock.synchronize do
      done += 1
    end
  end
end

while done < [Object1, Object2, Object3].size
  sleep 1
end

Only now using Mutex this is “safer”, why “safer” and not “safe”? because in

while done < [Object1, Object2, Object3].size
  sleep 1
end

We can’t be sure there isn’t a Thread out there touching the done mid test.
even using the lock like

lock.synchronize do
  while done < [Object1, Object2, Object3].size
    sleep 1
  end
end

Won’t help us, because those two are guard different parts of the code, so it’s not protecting the done it’s protecting the specific action of += .

The above was only realized today TBH, and already helped me stabilize the code a bit, but it’s hard to think about all those small nuances :)

Also, I’ve been bitten by the notion that Hash and Array are not thread safe, while everything is basically not Thread safe, so Mutex and smarts needs to be used in abundance :)

One more thing I find odd, why do we need to initialize Mutex each time, why not just have some global Mutex initialized for every crystal program, and then you can just use a macro for

synchronize do

end

lock
....
unlock

I’d suggest to use a channel to signal finished fibers. Channel is already thread safe so you don’t need to do anything special on your site to implement it for MT.

Because that would make it a global mutex. It’s better to have individual mutexes for individual synchronization tasks to be able to synchronize them individually.

1 Like

This code:

done = 0
[Object1, Object2, Object3].each do |runnable|
  spawn do
    runnable.run
    done += 1
  end
end

while done < [Object1, Object2, Object3].size
  sleep 1
end

must be rewritten to avoid sharing done between multiple fibers. This was OK before MT but in MT it’s not, and it’s the reason why we weren’t sure introducing MT was such a good idea because it breaks code and it makes programming much harder because you have to be careful not to share state.

To program it with MT in mind you have to try to use Channel first. What you want to do is very typical: do N things and wait for them all. There’s a common pattern for this:

runnables = [Object1, Object2, Object3]
done_channel = Channel(Nil).new

runnables.each do |runnable|
  spawn do
    runnable.run
    done_channel.send(nil)
  end
end

runnables.size.times do
  done_channel.receive
end

Channel is the type that you can use inside spawn to synchronize things. In fact it’s the only type that lets you do that so you have to think of ways to use it in your benefit.

In Go this is the same except that they have a couple of other types that we are currently missing, but they’ll come with time. One such type is WaitGroup. Eventually we’ll add it to Crystal and it’ll be like this:

runnables = [Object1, Object2, Object3]
wait_group = WaitGroup.new(runnables.size)

runnables.each do |runnable|
  spawn do
    runnable.run
    wait_group.done
  end
end

wait_group.wait

In fact we could add it to the std library right now using Channel as an implementation, though a more efficient implementation doesn’t involve channels.

4 Likes

Yeha, as I said only Today I realized my mistake, and started using channels for doing the same pattern as you just showed, a WaitGroup will be amazing! this will 100% make my life easier

A wait group already exists, it’s just called something else. It differs from other count down latch and wait group implementations in the following ways:

  • Allows error signaling to any waiting fibers.
  • Allows setting the wait count after initialization and in the middle of other fibers decrementing the counter.
  • Allows resetting after successfully waiting to reuse the object and avoid GC overhead.

@bararchy your first use example fits a wait group perfectly rather than using channels.

The documentation is slightly out of date. It handles multiple waiters.

Tested with and without -Dpreview_mt.

2 Likes

Interesting, I will check this out, have you benchmarked it against current preview_mt with Channel based implement?

Yes. Results depend on the number of fibers counting down. When n is small (1-2) it’s almost the same as a Channel for the current implementation. When n is larger (> 1000) it’s significantly faster.

Internally it uses a single atomic decrement for every count_down except the last. The last uses a size 1 Channel for signaling because it’s the only non moving target for crystal synchronization that doesn’t require touching the scheduler or fiber implementation.

I have plans to optimize it further but more concurrency primitives are needed.

Related discussion ist the RFC about structured concurrency. It’s a proposal that fits for use cases like a waitgroup but is actually more versatile.

Although structuring the program with wait groups or channels is at the end nicer and more flexible, the other alternative is to use Atomic(Int32) instead of Int32. Using done.add(1) and done.get will work in this example.

That will end up in big chunks of code that will be prevented to execute simultaneously. Leading to less performant code. Usually, you need to minimize the amount of critical code. Using communication instead of shared memory is a way to achieve that.

@bcardiff Which way i more performant? or more “proper” Channels or Atomic counter? (for this style)

If you only need to sync the counter Atomic should be more performant AFAIK. Atomic operations can be translated directly to a couple of native operations while channels use among other things a SpinLock that uses Atomic.

But synchronizing across Channel, or via messages in general is more flexible than sharing memory.

If you need to sync a counter and wait for completion then a wait group (latch) if fastest. Otherwise what @bcardiff said.

Could you elaborate on “wait group”?
This is new territory for me, and I have a hard time finding things mentioned here in the documentation.

Also, why is Atomic(Type) thread safe? Is that inherent in it being atomic? or is that because the class has been made with MT in mind?

A wait group is essentially a counter tracking task state. When you add a task, the counter increases. When the task completes, the counter decreases. As soon as the counter has reached zero, you know that all tasks have completed.
There is currently no Waitgroup implementation in Crystal. But you should be able to find lots of documentation for Golang.

It’s an inherent property to be thread safe. Otherwise there could be no guarantee that operations are really atomic when two threads try to access simultaneously.