Fiber#join

Might be nice to have a convenience method Fiber#join
Just so we don’t have to have a “done channel” every time we want to learn when a fiber exits… :slight_smile:
-roger-

1 Like

I’d rather see a Fiber#exit?

What would exit? do? Check if it’s still running?

Yes

That’s Fiber#dead?.

1 Like

I think they want something that will pause the current fiber until the awaited one finished. That would need some integration with the scheduler. And would allow hooking to running fibers for which the body is unknown or already set, hence it is not possible to add a done.send.

1 Like

Just an experiment, far from elegant:

  • define a subclass of Fiber - let’s call it JoinableFiber - where we
    • initialise a done channel at creation time
    • define a #join method that receives on it
    • [the hacky bit] override #run with a copy-pasted version from Fiber, where we send to done just before invoking Crystal::Scheduler.reschedule
  • redefine spawn to use the new class JoinableFiber class.
class JoinableFiber < Fiber
  def initialize(@name : String? = nil, &@proc : ->)
    super(@name, &@proc)
    @done = Channel(Nil).new(1)
  end
  def join
    @done.receive?
  end
  def run
    GC.unlock_read
    @proc.call
  rescue ex
    if name = @name
      STDERR.print "Unhandled exception in spawn(name: #{name}): "
    else
      STDERR.print "Unhandled exception in spawn: "
    end
    ex.inspect_with_backtrace(STDERR)
    STDERR.flush
  ensure
    {% if flag?(:preview_mt) %}
      Crystal::Scheduler.enqueue_free_stack @stack
    {% else %}
      Fiber.stack_pool.release(@stack)
    {% end %}
    # Remove the current fiber from the linked list
    Fiber.fibers.delete(self)

    # Delete the resume event if it was used by `yield` or `sleep`
    @resume_event.try &.free

    @alive = false
    @done.send nil
    Crystal::Scheduler.reschedule
  end
end
def spawn(*, name : String? = nil, same_thread = false, &block)
  fiber = JoinableFiber.new(name, &block)
  if same_thread
    fiber.@current_thread.set(Thread.current)
  end
  Crystal::Scheduler.enqueue fiber
  fiber
end

Now you can run the following from you main, and see “hello” being printed!

spawn do
  puts "hello"
end.join
2 Likes

Note that:

  • done.send will block until a receive is performed (mind if there are no joins executed)
  • only one join will be awaken

But definitely the monkey-patch extension is a way to check if the functionality is worth it. It can probably be applied directly to Fiber, without subclassing it.

1 Like

@done.send won’t block, as @done has capacity 1 - I should have highlighted that.
This means that JoinableFiber will complete properly, no matter if join is invoked or not.

But that would mean any fiber that is not explicitly joined will linger indefinitely.

In general, I’d prefer a solution as suggested in [RFC] Structured Concurrency · Issue #6468 · crystal-lang/crystal · GitHub which would provide the possibility to wait for a single fiber, but offers far more features beyond that. For example you can easily wait for n fibers, and can control what happens when a fiber raises.

1 Like

:man_facepalming: I didn’t see it was a buffered channel. Sorry!

Hey folks, I feel like I’m missing something here.

  • Why would a fiber that is not explicitly joined linger indefinitely? The channel has a capacity of 1, so the fiber will not block on send.

For example you can easily wait for n fibers

Given a set of Fibers you want to wait on, you’d just fibers.each { |f| f.join }. This is what you’d do in ruby. The fact that we might be waiting on the fiber running the longest doesn’t change the outcome. But maybe you wanted something different?

I don’t think we should let users play with fibers. spawn should return nil. The way to communicate and coordinate fibers is by using spawn + Channel and any other concurrency stuff we build in the future.

It wasn’t immediately clear to me how channels can be used for multiple listeners to know that a particular fiber has exited…especially if the fiber has already exited…

But you could write a wrapper that’s like

class FiberDone {
  @already_done = false
  @listeners = [] of Channel
  def fiber_done
    @already_done = true
    # notify all the listener channels
  end
  def join(channel)
    return if @already_done
    @listeners << channel
  end
}

then the fiber you want to join is responsible for calling (in an ensure?) the #fiber_done method. Something like this might be a good fit for the stdlib too? :)

Sorry, that statement was based on @bcardiff’s comment, which is inaccurate. So don’t mind my comment either ;)

You can do that. But it’s inefficient. Fibers that finish quickly will block until a longer-running fiber in front of them has finished, when they could already release their stack memory.

I still don’t see why that’s the case, sorry. Fibers that finish quickly will just terminate, as far as I can see. But their stack memory will still be allocated? Probably just me now knowing how the internals work. I’ll try to wrap my head around this, thanks :+1:

Fibers that finish quickly will just terminate, as far as I can see.

Okay, I double checked your implementation and the stack would actually be released before that.

The issue would exist when the done channel was implemented inside the fiber’s proc.

1 Like

This seems similar to what future { ... }.get does. At least, it’s what I usually use it for. :-)

f = future { do_some_work }

do_some_other_work

f.get
2 Likes

Oh nice. And that uses channels behind the scenes. The only drawback is if you don’t control the fiber creation point you can’t as easily use it. But maybe enough for now, at least for my use case, thank you!

In retrospect Fiber could have like a list of channels it should send on when it “exits” (like listener joining/waiting channels) and accomplish #join reasonably easily. We’ll see if it’s ever necessary I suppose :)