The Crystal Programming Language Forum

Restoring Channel#full?

I noticed that in 0.31 Channel#full? was removed with all the channel changes. I use it, so I had to get it working again.

class Channel(T)
  def full?
    if queue = @queue
      return queue.size >= @capacity
    end

    !@senders.empty?
  end
end



require "spec"

describe Channel do
  describe "full?" do
    it "returns true if the channel is full (unbuffered)" do
      ch = Channel(Nil).new
      ch.full?.should eq false
      spawn do
        ch.full?.should eq true
        ch.receive
      end
      ch.send(nil)
      ch.full?.should eq false
    end

    it "returns true if the channel is full (buffered)" do
      ch = Channel(Nil).new(3)
      3.times do
        ch.full?.should eq false
        ch.send(nil)
      end
      ch.full?.should eq true
    end
  end
end

Does that look good? Should i open a PR? Otherwise I’ll just monkeypatch Channel on my end whenever I need it, no problem.

While we wait for a reply from Brian or Juan, i want to note that full? isn’t very good: in the middle of that invocation the channel might release data so you’ll get “yes, it’s full” while in reality it’s not.

So the question is: how are you using Channel#full? ? Maybe you don’t really need it and it can be coded in a different way?

Also worth noting: in Go you can ask len(chan) for some reason, and with that you can deduce whether it’s full or not, but of course it suffers from the same problem: after you check it the length might have changed.

Scenario 1: Client sends too many packets which fills a channel and the client gets kicked out. There’s another fiber taking packets out of that channel of course.
Scenario 2: A channel of tasks from which a number of workers take them. If the channel is full, spawn more workers (up to a limit).

If removing it was intentional no worries, I’ll manage. Also after posting I noticed that #empty? is gone too. I guess because of the same reason.

This is getting real hard real fast. You are hitting difficult problems with concurrency that Crystal isn’t really equipped to deal with, which is just my opinion of course. I think your best bet at this time is making sure the mt is turned off and going ahead with your patches, because solving this in a real way requires a totally different mind- and toolset. I’m not saying yours is bad, I’m saying this is actually one of the hardest problems in today’s computing.

I don’t know, it has been working fine so far… even with mt enabled.

The default is single-threaded. Multithreading is opt-in while these things are sorted out.

Running a Crystal compiler built from source isn’t as trivial as running a released version so it’s important to cut a release that makes allowances for it to let more people try out multithreading.

If you wrap it in a mutex, the check+action should be atomic, right? I built a threadsafe connection pool backed by a channel for one of my apps it seemed to work okay if I wrapped it in a mutex. I had to do something similar to @Exilor but for empty?. If I need to check out a connection, there are none in the pool, and we haven’t reached our capacity, it needs to spin up another connection.

Code is here if you're curious
  class ConnectionPool(T)
    @lock = Mutex.new

    def initialize(@capacity = 25, &@new_connection : -> T)
      @current_size = 0
      @channel = Channel(T).new(@capacity)
    end

    def connection
      connection = check_out
      yield connection
    ensure
      check_in connection if connection
    end

    private def check_out : T
      @lock.synchronize do
        if (queue = @channel.@queue) && queue.empty? && @current_size < @capacity
          @current_size += 1
          return @new_connection.call
        end
      end

      @channel.receive
    end

    private def check_in(connection : T) : Nil
      @channel.send connection
    end
  end

I suppose it’d be ideal if Channel itself provided the interface for the atomic operation so we didn’t need to worry about mutexes in application code. Something like Hash#fetch where it’s presented as a single operation:

channel.fetch { do_the_empty_thing }
channel.push { do_the_full_thing }

it has been working fine so far

The problem with race conditions is everything works fine until it breaks in an unrelated place, a different one each time for extra fun.

The default is single-threaded.

Defaults change. If the correctness of the code depends on a switch, it’s better to watch that switch.

channel.fetch { do_the_empty_thing }
channel.push { do_the_full_thing }

Making that is trivial, what is less trivial is the contents of your empty and full things. What should the code do if a buffer is full? Raise? Push out the oldest value? The newest value? Block until there is a free slot? What if there is a state machine? Several state machines?

It’s easy to code a program that almost works.

it needs to spin up another connection

Yes, but why do you need a resizeable worker pool? Just spin the max amount at the start. And come to think about it, why do you need a pool in the first place? The thread pool is the natural limit anyway.

The blog post about multithreading addresses this very concern.

It yields to the block. The behavior of send and receive wouldn’t change.

Connection pool.

It yields to the block

What’s going to be inside the block?

Connection pool.

I’m not sure there’s a difference here. Please explain?

The removal of Channel#full? and #empty? was intentional. They leaked state of the channel in a way that can’t be guarantee. The sender of a buffered channel should not need to be aware whether the send will block or not (same way for the receiver).

If there is a need to limit quota then I suggest splitting the task and using some fibers for that only. At the end of the day there is already a fiber to accept each request. I don’t expect to be much different.

@jgaskins regarding the connection pool logic exposed, the creation of them are serialized. That is an unneeded penalty you are exposing to simultaneous clients. In crystal-db that is addressed by unlocking during the creation of the connection but counting the number of connection been created.