Unmanaged coroutines?

I’ve noticed Crystal provides pseudo-threads under the name Fiber. I read about this feature and tried it out, and to my understanding, they are coroutines under the hood, but are scheduled automatically by the event loop from a runtime pool, in a manner reminiscent of JavaScript’s tasks. I also understand work is being done to allow them to actually be multithreaded in some capacity.

It is nice to know how I can have asynchronous concurrency, but is there a facility for synchronous concurrency, more like Ruby’s Fiber? i.e., is there a way to make coroutines that are only ever scheduled via resume, never automatically?

Answering my own question, though I’m not sure if this is a perfect solution:

# Coroutine: Coroutine / generator class.
# TIn: Type which the coroutine can consume.
# TOut: Type which the coroutine can generate.
class Coroutine(TIn, TOut) < Fiber
  # initialize: Constructor expects a block that takes the instance
  # and the first TIn and returns the final TOut. Coroutine is not resumed
  # on construction.
  def initialize(&block : (Coroutine(TIn, TOut), TIn -> TOut))
    @inch = Channel(TIn).new(1)
    @outch = Channel(TOut).new(1)
    @block = block
    @callers = Array(Fiber).new # keep track of whom to yield back to
    super do
      @outch.send(@block.call(self, @inch.receive))
      @inch.close # let final calling fiber clean up @outch after polling it
      enqueue # trickery to let the fiber die
      @callers.pop.resume
    end
  end
  # NotResumableError: Thrown on invalid resume.
  class NotResumableError < Exception
    def initialize(msg = "Coroutine not resumable")
      super
    end
  end
  # resume: Returns control to the coroutine. Will fail with RuntimeError
  # if the coroutine is dead or already has control. Given TIn is returned
  # from the corresponding call to yield within the coroutine.
  # If this is the first resume, given TIn is instead passed to the block
  # given at construction.
  def resume(value : TIn) : TOut
    raise NotResumableError.new if dead? || !resumable?
    @inch.send(value)
    @callers.push(Fiber.current)
    resume
    result = @outch.receive
    if @inch.closed?
      @outch.close # if we're done, clean up @outch after polling it
      until dead?; Fiber.yield; end # trickery to let the fiber die
    end
    return result
  end
  # yield: Returns control from the coroutine to the last caller of resume.
  # Given TOut is returned from the corresponding call to resume.
  def yield(value : TOut) : TIn
    @outch.send(value)
    @callers.pop.resume
    return @inch.receive
  end
end

# Usage example
coro = Coroutine(Int32, Int32).new do |coro, n|
  4.times do
    puts "in coro, n = #{n}"
    n = coro.yield(n + 1)
  end
  puts "end coro, n = #{n}"
  n + 1
end
n = 0
while true
  puts "in main, n = #{n}"
  begin
    n = coro.resume(n + 1)
  rescue Coroutine::NotResumableError # bad practice, just for example
    puts "caught NotResumableError"
    break
  end
end

If there’s a built-in way to do this, I’d still be interested to hear it. It’s also possible Fibers already allow for this and I’m just misunderstanding them.

Giving my own question a better answer than before.

The API reference says this of Fiber#resume:

There are no provisions for resuming the current fiber (where this method is called). Unless it is explicitly added for rescheduling (for example using #enqueue) the current fiber won’t ever reach any instructions after the call to this method.

I missed this at first, and, even after noticing it, only thought of it as an annoyance that I had to work around. In actuality, this apparent limitation is a feature which provides the very functionality I was asking for and sought to implement.

Consider:

main = Fiber.current
n = 0
coro = spawn do
  4.times do
    puts "in coro, n = #{n}"
    n += 1
    main.resume
  end
end
4.times do
  puts "in main, n = #{n}"
  n += 1
  coro.resume
end

Thus, Fiber#resume allows us to use Fibers as unmanaged coroutines / resumable functions, and Fiber.yield allows us to use them as lightweight threads.

EDIT 3: Consolidated “wrong” answers into the below collapsible:

Several attempts to work around a problem that didn't exist

The only thing I can think of to look out for is this: Fibers are immediately enqueued upon construction, so if the managing Fiber yields control to the scheduler with Fiber.yield after constructing another Fiber, but before ever yielding control to it with Fiber#resume for the first time, it’s possible the scheduler will select the constructed Fiber, which violates the assumption we usually make about unmanaged coroutines that this can never happen. Depending on how we plan to use the constructed Fiber, this violation may sometimes be catastrophic.

A workaround, which is much more lightweight than my Channel-based approach but should be just as robust, is to have the body of every intended-function-like Fiber start with main.resume, and also immediately resume every intended-function-like Fiber upon construction:

coro = spawn do
  main.resume
  # ...
end
coro.resume

This will make the Fiber do nothing for its first run but remove itself from the scheduler’s pool and hand control back. For an intended-thread-like Fiber, this is obviously not ideal, so we can just not do that for them.

EDIT: Upon typing this out, I was immediately suspicious of it. Turns out only Fiber.yield can remove a competing Fiber from the scheduler’s pool; Fiber#resume cannot. So my suggestion in the second code block doesn’t work:

main = Fiber.current
coro = spawn do
  main.resume
  puts "shouldn't happen"
end
coro.resume
Fiber.yield # => "shouldn't happen"

To truly protect an intended-function-like Fiber from behaving as thread-like, we have to do this instead:

main = Fiber.current
started = false
coro = spawn do
  started = true
  main.resume
  puts "shouldn't happen"
end
until started; Fiber.yield; end

This means constructing an intended-function-like Fiber, even before ever resuming it, is always a breakpoint for the calling Fiber. I’m not crazy about that, but it’s probably okay.

We can wrap this in a function:

def coroutine(&block)
  caller = Fiber.current
  started = false
  coro = spawn do
    started = true
    caller.resume
    block.call
  end
  until started; Fiber.yield; end
  return coro
end

EDIT 2: Ok, final answer:

coro = Fiber.new do
  puts "shouldn't happen"
end
Fiber.yield # => nothing

If Fibers are constructed with Fiber.new instead of with spawn, they are never enqueued in the first place. My bad, it’s been as simple as that all along.