Is concurrency broken?

(Sorry for the clickbait title, I’m not sure how to frame this better.)

I’ve been using Crystal for a year now, and so far I’m pretty much ready to give up on the most promising usecase that got me in: a tool to let me develop daemons. It’s got many things going for it: a really well done static analysis, a wonderful toolchain and many more, sadly one crucial piece is missing, being the concurrency workflow that I would be able to use.

To demonstrate the problem let’s look at the Socket, or IO::Evented: once I call io.read, there is no documented way for me to cancel this “blocking call”, short of closing the io. Why would I need that? Well, first thing that comes to mind is something like a graceful fiber shutdown, another is data isolation, let me explain.

Each socket is bi-directional, you use the same object to call read and write on, so if I need asynchronous operation – meaning there is no set order of instructions, rather a number of callbacks that should handle independent events – I have two basic choices: a) spawn two fibers that share the same io, or 2) start drowning in emulating select() atop of fibers that emulate threads atop of select().

Spawning two fibers looks easy until you realise that it’s your job to synchronise the state and error handling: you can’t return from a fiber, and you can’t rescue an exception from another fiber, so now you have to have a third fiber that sits atop of a channel to read the last words of your reader and writer to analyze them and hopefully make sense of what just happened.

At some point in this journey I’ve realized a symmetry between the fibers/channels and processes/pipes that make the userspace concurrent. I guess there’s an argument for the whole fork model to also be considered a form of CSP, but guys!

  • the kernel has one more crucial tool: signals
  • you can’t fork your way out of any problem, that’s why select() exists!

Anyway, I’ve started looking around and tinkering with the scheduler, and I’ve come up with a set of ideas to implement a way to “interrupt” a “blocking call” so that I can have non-shareable data and some code around it and a way to use all that and not go insane.

(And then I’ve read Pony tutorial. I didn’t write any pony yet, but as I was reading about it, all my boxes were being checked, it felt like God was answering my prayers lol.)

So, now I have ideas for Crystal that I was hoping to make into some uploadable code, but wasn’t able to, and I’m not sure I would be anymore, but if anyone is interested, I’d like to somehow pass it on. I have an application built with these patches that seem to work, I can share the code, but I think it needs my comments. If anyone is interested, I’m available (and eager) for a call and a screenshare, otherwise maybe I’ll find the inspiration to hack together some sort of an article.

TL;DR: maybe that’s because I’m missing something really bad, but I don’t see how I can write a correct concurrent program using vanilla crystal.

1 Like

Crystal’s concurrency model came from Go, where it’s been proven to work for a pretty massive number of use cases. It sure works in a whole lot more places than Ruby’s concurrency model.

Just because it doesn’t fulfill your specific need doesn’t mean it’s broken. It just means that Crystal’s concurrency model doesn’t cover the complete set of all possible use cases for concurrent I/O. It does, however, make the vast majority of cases downright trivial. This is pretty amazing because concurrency and I/O are both hard problems.

I’m not familiar with Pony, but maybe a library could be built around a similar model. It would probably have to dig underneath the more conventional IO API in Crystal, but maybe there’s an opportunity there.

You can set a read timeout on sockets, FWIW…also I wonder how golang is able to enforce timeouts? hmm…

I’m not looking for a fight here, I’m more than willing to concede that there is something I don’t know, but come on, that something isn’t the golang’s model or a timeout argument.

If it’s easy to do, could you point me to an example? I’m looking for a production quality, correct and complete program, not a hello world though. There is a topic here, on this forum, called “concurrency challenge”, with nothing so far, there is a video of a trivial chat server, which is that hello world.

than Ruby’s concurrency model

On the one hand that’s a valid point, on the other – at least Ruby gives me my fork and my select, unlike Crystal.

doesn’t cover the complete set of all possible use cases for concurrent I/O

IO is just an example, as soon as you find yourself inside any “blocking call” your options are limited. IO you can at least close, try breaking a sleep. It’s possible, but undocumented.

As for the usecases, that is of course correct. Crystal still looks good for code that doesn’t have to be a production daemon.

I don’t know if this applies to your problem, but someone suggested the following for a different problem and it seems to at least enable you to break sleep in a way:

Use a channel to decouple each IO from the rest of your program. on the other end of the channel you can then handle your things, and even microsleep and check if anyhing is available on the channel instead of just waiting on the channel.
If I understand the problem correctly, that should enable you to maybe not cancel the IO itself, but cancel whatever fiber is waiting for it (through the channel).

1 Like

No, sorry, that doesn’t really work.

Use a channel to decouple each IO from the rest of your program

That would make some sense if you could Channel#select on both an incoming channel to listen to data to write to socket AND the io to listen to data to read from socket, but can you?

microsleep and check if anyhing is available on the channel

Not only channel#empty? was removed, that whole pattern is what I call emulating select atop of fibers. IMO it’s not a productive way forward.

not cancel the IO itself, but cancel whatever fiber is waiting for it

If I don’t need to handle errors and shutdown fibers, the problem mostly goes away, but that’s why I specify correctness as one of my goals. If I need to disconnect from the counterparty, I will have to cancel the IO itself, and if I do that while reading, I have two problems: a) shared state (how do I have the reference to the io outside the fiber that reads it?) and b) an exception that I need to analyze, because that is not a failure, or is it?

Hi stronny!

I agree that doing those things is probably difficult to do in Crystal. But maybe these things are inherently complex and they will require a similar effort in other languages?

Can you provide working code for what you need to achieve in other languages, for example Ruby, Java, C# or any other language you are familiar with? Then we can start comparing how easy or hard it is to do those things compared to other languages.

Hey Ary!

In Ruby I would typically either fork or use eventmachine/any other epoll wrapper. Actually I can’t really compare Crystal directly, because I didn’t even try to solve these problems before.

The main point I try to make isn’t that this other tool is better, it’s that Cystal resembles the OS:

  • process is fiber
  • pipe is channel
  • scheduler exists
  • there are blocking calls

However, without signals this model doesn’t really work, imagine being unable to kill() a process in the real OS. Imagine that once you enter any blocking syscall there is no EINTR.

On the one hand, with fibers we have “synchronous” code paths, on the other we still need some degree of controllable asynchronicity even inside the fibers, otherwise this model is not very useful.

Another way to look at the problem is via the scheduler itself. I think it would be useful to create and expose some primitive action that I can describe as “sleep until one of these events happen and tell me which ones did”. That goes against current model though. Channels look like they can do that, but then the language needs to internally translate real system events into some channel action.

how easy or hard it is to do those things

How easy would it be to develop a program that sends a Time over tcp, receives it back and prints the difference? It took me half a year in Crystal.

Does this work for you?

line_chan = Channel(String).new
sleep_chan = Channel(Nil).new

spawn do
  sleep(5)
  sleep_chan.send(nil)
end

spawn do
  line = STDIN.gets
  if line
    line_chan.send(line)
  end
end

select
when line = line_chan.receive
  puts "Got line: #{line}"
when sleep_chan.receive
  puts "Got sleep"
end

We spawn two pieces of code, send messages over channels, and determine which one happens first.

The nice thing is that you can use select with anything that can block: a sleep, waiting for IO, etc. Unlike C’s select it’s not tied to just file descriptors.

Channels look like they can do that, but then the language needs to internally translate real system events into some channel action

Indeed, it seems channels can do that. Is the problem then that the underlying implementation is not good enough?

How easy would it be to develop a program that sends a Time over tcp, receives it back and prints the difference?

There’s probably something tricky I’m missing, but isn’t that doing send, receive and comparing the difference?

Crystal is not a 1 to 1 mapping from the language to the underlying machine or OS. It provides some abstractions and has some decisions that will result in you not be able to achieve some things if you want to code directly against the machine (like you can achieve when using C, C++ and probably Rust). For example it has GC and you can’t remove it, so you can’t write every program in it. With concurrency fork will be dropped too, also because I think Windows doesn’t have fork, plus fork doesn’t work well with threads. I think Crystal had IO.select before and it was removed, I can’t remember well but maybe it didn’t play nicely with fibers/channels, or maybe it was redundant with spawn/channel. Go doesn’t have it either.

This is the PR where IO.select was removed, together with why it was done: https://github.com/crystal-lang/crystal/pull/4392

1 Like

There’s probably something tricky I’m missing, but isn’t that doing send, receive and comparing the difference?

Seems trivial right? A prototype could be made in ten minutes, and then FUN begins. Let me break this down a bit.

Indeed, it seems channels can do that. Is the problem then that the underlying implementation is not good enough?

Let’s consider a simple chat server: you’ve received a line from a socket, how do you send it to other sockets? You have to have either 1) a shared vector of raw IO instances to call #write on them directly, or 2) a vector of channels, each leading to a fiber that does @io.puts ch.receive.

(1) is problematic on many fronts, but even (2) has shared data: its @io is the same @io that is used to read from another fiber. Question: how do you handle errors?

An even better example would be spawning a child process. I’d like to have a select like this:

select
when line = input_channel.receive
  child_stdin.send line
when line = child_stdout.receive
  handle_out line
when line = child_stderr.receive
  handle_err line
when status = child_exited.receive
  status_channel.send status
  return
when shutdown = shutdown_channel.receive
  child.kill Signal::TERM
end

That is fine for a prototype, but not good enough for a finished program. What if child_stdin.send blocks? Again, how do you handle errors?

Basically my problems come down to several connected issues:

  1. Shared mutable data. If you have the same io for reading and writing in separate fibers, this io shares state. It doesn’t deny you anything, but it makes reasoning very hard. If you have it in a single fiber, how do you read and write async?
  2. No way to interrupt a blocking call. It’s much easier to think in terms of (state) → init → block → finalize, and there is no way to accomplish that.
  3. No parent-child relationships and nothing to hold on to when thinking about you program’s structure.

Crystal is not a 1 to 1 mapping from the language

I’m not arguing that it should be, I understand all that and agree. What I’m pointing at is that it’s fine to wrap the low level as long as your wrapper provides a way to be productive, hopefully to be more productive than using something you wrap.

Again, I’d like to point out the similarity of the linux concurrency and Crystal concurrency. Another thing I’ve discovered on this journey is the similarity between Channel::Unbuffered#send and a non-local goto.

Here’s a whole bunch of code: https://github.com/stronny/crystal-concurrency-whatever

It’s not meant as a release of any kind, please excuse the mess.

Main things of interest:

  • a job is allocated inside the constructor, so by default its reference only exists inside the fiber’s stack
  • each fiber has a parent; any parent will only finish after all its children have also finished
  • something of a resemblance to SIGINT and SIGCHLD
  • also custom “signals” with runtime dispatch (something I would prefer to make static, but unsure how)

What if child_stdin.send blocks?

How would you handle that in other languages? Just wondering…

Maybe some kind of bidirectional channel (or two channels?) for each descriptor…I think select is supposed to be on channels, not on file descriptor commands…

You can handle errors on their own channel or by closing a channel [?]

Speaking of which I’m assuming crystal channels support some of the stuff golang’s does like timeouts and “nonblocking” options? Anyway reading through the golang channel docs might be valuable. Sorry I don’t have too much experience with raw sockets and things in crystal yet to help out more, good luck!

Just becuse I had a hard time finding anything about this:

2 Likes

Crystal’s concurrency is just fine … you’re asking it to perform asynchronous IO via kqueue or epoll, or aka “event loop”.

Nim has: https://nim-lang.org/docs/asyncnet.html

And Pony makes use of the Actor model for built-in asynch operations as you’ve discovered.

Go pulls a pragmatic fast one: the socket API in Go is using epoll under the hood, neatly abstracted away from developers.

My recommendation would be to just import epoll directly as calling to C is very straight forward in Crystal, and then bundle it into your app as a Shard. I’ve looked at doing the same, and also am a big fan of Pony.

edit: looks like there’s some effort stubbed out here to pull in libevent. As far as I can tell, all the enqueue/dequeue functions hanging on crystal::scheduler are using threads.

Also, a parting thought: there is no reason to use poll/select IMO unless you need to manage a large number of file descriptors. What kinds of loads are you expecting on your daemon? epoll is a big hammer. consider that a regular ol’ message queue may work just fine for your application.