Might be nice to have a convenience method Fiber#join
Just so we don’t have to have a “done channel” every time we want to learn when a fiber exits…
-roger-
I’d rather see a Fiber#exit?
What would exit? do? Check if it’s still running?
Yes
That’s Fiber#dead?
.
I think they want something that will pause the current fiber until the awaited one finished. That would need some integration with the scheduler. And would allow hooking to running fibers for which the body is unknown or already set, hence it is not possible to add a done.send
.
Just an experiment, far from elegant:
- define a subclass of Fiber - let’s call it
JoinableFiber
- where we- initialise a
done
channel at creation time - define a
#join
method that receives on it - [the hacky bit] override
#run
with a copy-pasted version fromFiber
, where we send todone
just before invokingCrystal::Scheduler.reschedule
- initialise a
- redefine spawn to use the new class
JoinableFiber
class.
class JoinableFiber < Fiber
def initialize(@name : String? = nil, &@proc : ->)
super(@name, &@proc)
@done = Channel(Nil).new(1)
end
def join
@done.receive?
end
def run
GC.unlock_read
@proc.call
rescue ex
if name = @name
STDERR.print "Unhandled exception in spawn(name: #{name}): "
else
STDERR.print "Unhandled exception in spawn: "
end
ex.inspect_with_backtrace(STDERR)
STDERR.flush
ensure
{% if flag?(:preview_mt) %}
Crystal::Scheduler.enqueue_free_stack @stack
{% else %}
Fiber.stack_pool.release(@stack)
{% end %}
# Remove the current fiber from the linked list
Fiber.fibers.delete(self)
# Delete the resume event if it was used by `yield` or `sleep`
@resume_event.try &.free
@alive = false
@done.send nil
Crystal::Scheduler.reschedule
end
end
def spawn(*, name : String? = nil, same_thread = false, &block)
fiber = JoinableFiber.new(name, &block)
if same_thread
fiber.@current_thread.set(Thread.current)
end
Crystal::Scheduler.enqueue fiber
fiber
end
Now you can run the following from you main, and see “hello” being printed!
spawn do
puts "hello"
end.join
Note that:
-
done.send
will block until a receive is performed (mind if there are no joins executed) - only one join will be awaken
But definitely the monkey-patch extension is a way to check if the functionality is worth it. It can probably be applied directly to Fiber
, without subclassing it.
@done.send
won’t block, as @done
has capacity 1 - I should have highlighted that.
This means that JoinableFiber
will complete properly, no matter if join
is invoked or not.
But that would mean any fiber that is not explicitly joined will linger indefinitely.
In general, I’d prefer a solution as suggested in [RFC] Structured Concurrency · Issue #6468 · crystal-lang/crystal · GitHub which would provide the possibility to wait for a single fiber, but offers far more features beyond that. For example you can easily wait for n fibers, and can control what happens when a fiber raises.
I didn’t see it was a buffered channel. Sorry!
Hey folks, I feel like I’m missing something here.
- Why would a fiber that is not explicitly joined linger indefinitely? The channel has a capacity of 1, so the fiber will not block on
send
.
For example you can easily wait for n fibers
Given a set of Fibers you want to wait on, you’d just fibers.each { |f| f.join }
. This is what you’d do in ruby. The fact that we might be waiting on the fiber running the longest doesn’t change the outcome. But maybe you wanted something different?
I don’t think we should let users play with fibers. spawn
should return nil
. The way to communicate and coordinate fibers is by using spawn
+ Channel
and any other concurrency stuff we build in the future.
It wasn’t immediately clear to me how channels can be used for multiple listeners to know that a particular fiber has exited…especially if the fiber has already exited…
But you could write a wrapper that’s like
class FiberDone {
@already_done = false
@listeners = [] of Channel
def fiber_done
@already_done = true
# notify all the listener channels
end
def join(channel)
return if @already_done
@listeners << channel
end
}
then the fiber you want to join is responsible for calling (in an ensure?) the #fiber_done
method. Something like this might be a good fit for the stdlib too? :)
Sorry, that statement was based on @bcardiff’s comment, which is inaccurate. So don’t mind my comment either ;)
You can do that. But it’s inefficient. Fibers that finish quickly will block until a longer-running fiber in front of them has finished, when they could already release their stack memory.
I still don’t see why that’s the case, sorry. Fibers that finish quickly will just terminate, as far as I can see. But their stack memory will still be allocated? Probably just me now knowing how the internals work. I’ll try to wrap my head around this, thanks
Fibers that finish quickly will just terminate, as far as I can see.
Okay, I double checked your implementation and the stack would actually be released before that.
The issue would exist when the done channel was implemented inside the fiber’s proc.
This seems similar to what future { ... }.get
does. At least, it’s what I usually use it for. :-)
f = future { do_some_work }
do_some_other_work
f.get
Oh nice. And that uses channels behind the scenes. The only drawback is if you don’t control the fiber creation point you can’t as easily use it. But maybe enough for now, at least for my use case, thank you!
In retrospect Fiber could have like a list of channels it should send on when it “exits” (like listener joining/waiting channels) and accomplish #join reasonably easily. We’ll see if it’s ever necessary I suppose :)