Concur: A collection of concurrency utilities leveraging Fibers and channels

I think I’m finally ready to share a shard I’ve worked on intermittently over the past two (2!) years.

Some of it still feels quite experimental, but I told myself there is no point in working on this any longer without getting some feedback.

So here it is: GitHub - lbarasti/concur: A collection of concurrency utilities for Crystal, leveraging Fibers and channels.

A TL;DR of what you’ll find in there:

  • Future: the wrapper for async computations you have been awaiting for :crystal_ball:

    key = Future.new { "WOW" }
      .map { |v| v.downcase }
      .select { |v| v.size <= 3 }
      .recover { "default_key" }
      .await # => "wow"
    
  • A stream-processing oriented API for Channel, with support for concurrent operations :rocket:

    jobs = Channel(Job).new
    
    jobs
      .map(workers: 4) { |job| process(job) }
      .select(&.success?)
      .take(10) # => [job_i_output, ...]
    

Use cases

I’m thinking Future might be useful in web applications, to wrap API calls or long-running computations, while the stream-processing API could come in handy while prototyping, or to write simulations.

More importantly, what do you think this could help you build? What do you feel is missing - or superfluous?

8 Likes

Damn, those streams look pretty neat! I’ve been thinking about how channel generators would look like and this looks like a fantastic blueprint for that.

With that in mind though, I’m not at all a fan of Futures on a conceptual level.
I think it all comes down to a resource point of view. This has several aspects, the most obvious one being that if await ends up not being called, then it is possible to get silent errors, or silent resource usage for an unbounded (or infinite if it never finishes) amount of time. It was nice to see you catch errors and propagate them in case of await, but I’m of the opinion that it simply should not be possible to forget.

A second aspect of resource usage not being sufficiently obvious is that if the Future make use of a local variable from the outer scope, then things may break or end up being undefined behavior if the await happens outside the current function. Meaning that innocent looking refactorings may end up breaking things.

But all that said, those channels looks damned nice. I’ll have to think hard how to incorporate something similar into my nested scheduler shard :slight_smile:

3 Likes

Thanks for the feedback! I think I understand your concerns around local variables from outer scope, definitely something I could highlight with some examples - maybe in a gotcha section :+1:

if await ends up not being called, then it is possible to get silent errors, or silent resource usage for an unbounded (or infinite if it never finishes) amount of time.

Yes, that’s definitely a risk, but I don’t know if it has to do with await in particular or with concurrency more in general - e.g., don’t you have the same risk when spinning off fibers that might write to a channel that no one reads?

2 Likes

don’t you have the same risk when spinning off fibers that might write to a channel that no one reads?

In vanilla Crystal, yes.

But if you use my shard progress will not pass past the end block (thus preventing orphan fibers), and any errors in fibers will be caught and propagated to the main fiber, by default. It can only happen if you don’t have a well defined obligatory fiber join point which won’t be passed under any circumstance. Or in other words, it is only an issue if there is no structure imposed on the concurrency to prevent it.

That said, I do some pretty nasty monkeypatching to make all that possible :slight_smile:

Oh, I see, that makes sense to me now :slight_smile:
I’ll have a look at!

Eventually I plan to make it into some sort of starting point to get some progress on the structured concurrency issue , but right now it is a bit more heavy than I want it to be, as I havn’t figured out how to work within an existing pool without spawning a new thread. Being able to spawn a totally new thread pool is a very nifty feature but it should default to work within the same threadpool as the original was running in.

1 Like