Channels

I’ve been thinking lately about how the Crystal and Go ecosystems use channels as I work with things in Crystal that are commonly done with Go (such as infrastructure and NATS). One thing that I find surprising is that channels are often part of the interface for a Go API but, from what I can tell, Crystal Channels tend to be implementation details that aren’t exposed.

Does that match anyone else’s observations?

1 Like

I heard that in Go, people often pass channels as function arguments.
I don’t fully grasp what that really means yet, though.

Exactly. I’ve seen a lot of Go functions that both accept channels as inputs and return them as outputs. But I haven’t seen that in Crystal and I think that’s interesting and I want to understand why.

What I’ve often seen, though, is people using Go channels in the way that we might use Iterator/Enumerable in Crystal:

for item := range ch {
  handle(item)
}

It’s conceptually similar to doing something like this:

while item = channel.receive?
  handle item
end

Part of the reason I’m thinking about this is that I’m writing a method that returns an object that streams values in, so I was going to make the return value an Iterator so you could grab one at a time, but then I realized that’s also what Channel does. But also it might be useful to be able to just iterate over them with each:

channel.each do |item|
  handle item
end

Do you have any concrete examples for APIs using channels? Do similar APIs exist in Crystal where they use different types?

I suppose a Channel#each method could perhaps be useful. Maybe it could even include Enumerable…? :thinking:
Not sure if this is a good idea, but it doesn’t seem too far off: A channel yields a sequence of items, just like an Iterator.
At the very least, we could have something like a ChannelIterator.

1 Like

These are just a few example APIs across well known projects. Each of these projects use channels extensively, so the examples aren’t the only times they’re part of a public interface. I also tried to choose examples that exposed channels via APIs that a developer would use as a dependency of their app so it would be something someone couldn’t be expected to know the implementation details of.

This is what I was thinking. The more I rolled it around in my mind, the more I realized a Channel(T) is effectively an unofficial Iterator(T). It just names next as receive which raises when closed instead of returning a Stop.

What got me thinking about this is, in my Crystal NATS client, I’m working on the API that notifies you when keys in a KV store or objects in an object store are created/updated. The first-party Go client returns a channel for this.

IIRC with waj we choose not make Channel an iterator because the context switch made them far less performance than a regular iterator. Although it can enumerate things, it was one of those instances where we thought it was better to prevent such usage.

The fact that a channel is known to finish its enumeration when it’s closed is also not ideal. Putting it in another way: when you get a Channel and use it as a enumerator, unless it’s closed, you never know if you are done.

1 Like

I can definitely understand wanting to discourage people from using Channel as a general-purpose iterator, especially at the time. However, since then, Channel has had a lot of changes (including your complete overhaul for 0.31, IIRC) and Julien did a bunch of work on the synchronization primitives that Channel uses, so it’s a lot faster now than it was back then. Maybe those performance concerns could be revisited?

Quick benchmark:

➜  Code crr channel_iterator.cr
1
2
3
channel 249.26M (  4.01ns) (± 1.13%)  0.0B/op   1.55× slower
  deque 386.43M (  2.59ns) (± 5.41%)  0.0B/op        fastest

Receive while empty
channel 585.39M (  1.71ns) (± 0.96%)  0.0B/op   1.82× slower
  deque   1.07G (  0.94ns) (± 2.41%)  0.0B/op        fastest
code
class Channel(T)
  include Iterator(T)

  def next
    receive_impl { stop }
  end
end

# Quick function check
channel = Channel(Int32).new(10)
channel.send(1).send(2).send(3)
channel.close
channel.each { |i| puts i }

require "benchmark"

Benchmark.ips do |x|
  c = Channel(Int32).new(10)
  d = Deque(Int32).new

  x.report "channel" do
    c.send 1
    c.next
  end

  x.report "deque" do
    d << 1
    d.shift
  end
end

puts
puts "Receive while empty"
Benchmark.ips do |x|
  c = Channel(Int32).new(10)
  c.close
  d = Deque(Int32).new

  x.report "channel" { c.next }
  x.report "deque" { d.shift? }
end

Still 55% slower than a Deque but, at least on my MacBook Pro, it sends and receives a quarter-billion values per second. Not a perfect comparison because the Deque doesn’t offer the benefit of blocking while empty but, since Channel is implemented via Deque, it seems like a good way to measure the overhead of Channel.

I think that would have to be part of using it as an iterator. The producer side of the channel would have to close it to signal that it won’t yield anything else, as in my example above. And then the channel is inert. That seems to be how a lot of folks use Go channels.

Now, it does assume a single-producer/single-consumer scenario or an orchestrated multi-producer scenario (such as a fan-out workload like a map/reduce) that would then close the channel. Channel is designed for more than that, but it wouldn’t be feasible to use it as an Iterator in other scenarios. So it would be similar to Slice having some methods that exist only for Bytes.

I could open an issue for us to discuss further if that would be preferable. Then we could get into details around API and implementation.

2 Likes

Making channel an Iterator on the consuming side raises the question of if there are things that can be improved wrt iterators on the sending side as well. In particular the case where the sender has a long list of items has some very low hanging fruits when it comes to enqueuing a lot of stuff at once. Amortizing the lock taking over many simultaneously enqueued items can probably speed things up a bit in certain scenarios. Which could be neat to have in the SPMC case.

Perhaps there is a bulk-consume use case when doing fan-in on the receiving side as well :thinking:

Iterator doesn’t have any opinions on the producer side. Do you mean for Channel specifically?

When I think of using channels as iterators, I mainly have SPSC for async work or occasionally MPSC for fan-out work in mind. What is the benefit of using an SPMC channel this way?

Go also merely uses a fiber+channel that you use as a select receive action for handling timeouts.

It means that select only has to deal with channel actions and nothing else. It also allows to receive messages for a set amount of time only, instead of having a timeout for each message (from Go Concurrency Patterns ):

func main() {
    c := boring("Joe")
    timeout := time.After(5 * time.Second)
    for {
        select {
        case s := <-c:
            fmt.Println(s)
        case <-timeout:
            fmt.Println("You talk too much.")
            return
        }
    }
}

In Crystal, we’d have to manually recalculate the timeout for each message, or manually start a fiber (with a large stack) that sleeps then writes to the channel, while time.After in Go likely only starts a tiny fiber (2KB stack).

Do you mean for Channel specifically?

I literally explained what I meant in the next sentences.

What is the benefit of using an SPMC channel this way?

Like I wrote, it would save a whole lot of locks acquirements (and thus, potentially, reschedulings). So performance.

It would also be a little less to type, eg

channel.bulk_push(some_iterable)

instead of

some_iterable.each { |item| channel.push(item) }

Though perhaps it could be turned around, like

some_iterable.into(channel)

oslt.

It was unclear to me, so I asked for clarification. No need to be snippy.

There are other alternatives. I haven’t looked at time.After in Go, but I wrote up this proof of concept that produces 10 items in the channel but only consumes a few before timing out.

Proof of concept
require "log"

c = Channel(Int32).new(10)
spawn do
  100.times do |i|
    sleep 50.milliseconds
    c.send i
  end
  c.close
end

after = Time.monotonic + 500.milliseconds

loop do
  select
  when value = c.receive
    sleep 100.milliseconds
    Log.info { value }
  when timeout(pp after - Time.monotonic)
    break
  end
end

def timeout_select_action(total_time : Time::TotalSpan)
  timeout_select_action(total_time.remaining)
end

struct Time
  def self.after(span : Span)
    TotalSpan.new span
  end

  struct TotalSpan
    def initialize(span : Span)
      @timeout_after = Time.monotonic + span
    end

    def remaining
      @timeout_after - Time.monotonic
    end
  end
end

class Channel(T)
  include Iterator(T)

  def each(*, timeout_total : Time::Span, &)
    total = Time::TotalSpan.new(timeout_total)
    loop do
      # `timeout` with a negative time never fires. bug?
      # return if total.remaining <= 0.seconds

      select
      when value = receive
        yield value
      when timeout(total)
        return
      end
    end
  end

  def each(*, timeout_each : Time::Span, &)
    loop do
      select
      when value = receive
        yield value
      when timeout(timeout_each)
        return
      end
    end
  end

  def next
    receive_impl { stop }
  end
end

The producer fiber generates a new element every 90ms. The first pass on the consumer has a 500ms total timeout and it takes 150ms to process each item, so it only handles a few of them. The second pass on the consumer just consumes the rest of the channel to illustrate that it picks up where it left off before.

This includes a workaround for the fact that if you pass a negative span (or even 0.seconds) to timeout inside the loop, it isn’t actually timing out. It feels like a bug, but I may also be misunderstanding.

Sorry, to me it is kinda obvious that it is about Channels as my comment talks about the act of sending. This because there is no sending going on if it isn’t about the interaction with Channels. Channels also being the main topic of the whole thread. Iterators also don’t use locks by default - Channel does.

So I’m a bit confused about how there could be lack of clarity.

You used the word for the abstract type while talking about the concrete type. It’s not a big deal to use the wrong word. I do it, too. But the way my mind works, I assume other people are using the words they intend to use, just like I try to. So when someone says something that doesn’t fit my mental model of the situation, I ask questions to understand whether I’m misunderstanding or they misspoke. If I use the wrong word and someone asks, I’m happy to clarify.

Maybe you’re good at inferring what someone meant. It would be awesome if I were, too, but I infer meaning incorrectly often enough that it’s easier to just ask the other person what they meant.

3 Likes

And now I am even more confused. There are no abstract types involved here. And I have no idea what wrong variant of whichever type I’m supposed to have used, or what you are even referring to.

Perhaps my English is lacking and if so I’m sorry but I really don’t see any weird or unintentional ways to read what I wrote.

Sure there are. Iterator(T) is an abstract type.

Feel free to DM if you want to discuss this further. I don’t want to derail the thread any more than we already have.

Your PoC alternative is exactly what I said: “you’d have to recalculate the timeout for each message:sweat_smile:

That means that each select has to query the monotonic now, create a timer, enqueue it, and cancel it on each iteration, while time.After only does so once for all the iterations.

It also demonstrates that the Go solution is composable: both cases behave identically and don’t need special handling, the only difference is when the timer is created. That’s a good example to maybe use channels more in everyday Crystal.

I’m not sure we want to start an actual fiber on every timeout (especially with a 8MB virtual stack) but a timer eventually publishing to a channel is :eyes: