I noticed that in 0.31 Channel#full? was removed with all the channel changes. I use it, so I had to get it working again.
class Channel(T)
def full?
if queue = @queue
return queue.size >= @capacity
end
!@senders.empty?
end
end
require "spec"
describe Channel do
describe "full?" do
it "returns true if the channel is full (unbuffered)" do
ch = Channel(Nil).new
ch.full?.should eq false
spawn do
ch.full?.should eq true
ch.receive
end
ch.send(nil)
ch.full?.should eq false
end
it "returns true if the channel is full (buffered)" do
ch = Channel(Nil).new(3)
3.times do
ch.full?.should eq false
ch.send(nil)
end
ch.full?.should eq true
end
end
end
Does that look good? Should i open a PR? Otherwise I’ll just monkeypatch Channel on my end whenever I need it, no problem.
While we wait for a reply from Brian or Juan, i want to note that full? isn’t very good: in the middle of that invocation the channel might release data so you’ll get “yes, it’s full” while in reality it’s not.
So the question is: how are you using Channel#full? ? Maybe you don’t really need it and it can be coded in a different way?
Also worth noting: in Go you can ask len(chan) for some reason, and with that you can deduce whether it’s full or not, but of course it suffers from the same problem: after you check it the length might have changed.
Scenario 1: Client sends too many packets which fills a channel and the client gets kicked out. There’s another fiber taking packets out of that channel of course.
Scenario 2: A channel of tasks from which a number of workers take them. If the channel is full, spawn more workers (up to a limit).
If removing it was intentional no worries, I’ll manage. Also after posting I noticed that #empty? is gone too. I guess because of the same reason.
This is getting real hard real fast. You are hitting difficult problems with concurrency that Crystal isn’t really equipped to deal with, which is just my opinion of course. I think your best bet at this time is making sure the mt is turned off and going ahead with your patches, because solving this in a real way requires a totally different mind- and toolset. I’m not saying yours is bad, I’m saying this is actually one of the hardest problems in today’s computing.
The default is single-threaded. Multithreading is opt-in while these things are sorted out.
Running a Crystal compiler built from source isn’t as trivial as running a released version so it’s important to cut a release that makes allowances for it to let more people try out multithreading.
If you wrap it in a mutex, the check+action should be atomic, right? I built a threadsafe connection pool backed by a channel for one of my apps it seemed to work okay if I wrapped it in a mutex. I had to do something similar to @Exilor but for empty?. If I need to check out a connection, there are none in the pool, and we haven’t reached our capacity, it needs to spin up another connection.
Code is here if you're curious
class ConnectionPool(T)
@lock = Mutex.new
def initialize(@capacity = 25, &@new_connection : -> T)
@current_size = 0
@channel = Channel(T).new(@capacity)
end
def connection
connection = check_out
yield connection
ensure
check_in connection if connection
end
private def check_out : T
@lock.synchronize do
if (queue = @channel.@queue) && queue.empty? && @current_size < @capacity
@current_size += 1
return @new_connection.call
end
end
@channel.receive
end
private def check_in(connection : T) : Nil
@channel.send connection
end
end
I suppose it’d be ideal if Channel itself provided the interface for the atomic operation so we didn’t need to worry about mutexes in application code. Something like Hash#fetch where it’s presented as a single operation:
Making that is trivial, what is less trivial is the contents of your empty and full things. What should the code do if a buffer is full? Raise? Push out the oldest value? The newest value? Block until there is a free slot? What if there is a state machine? Several state machines?
It’s easy to code a program that almost works.
it needs to spin up another connection
Yes, but why do you need a resizeable worker pool? Just spin the max amount at the start. And come to think about it, why do you need a pool in the first place? The thread pool is the natural limit anyway.
The removal of Channel#full? and #empty? was intentional. They leaked state of the channel in a way that can’t be guarantee. The sender of a buffered channel should not need to be aware whether the send will block or not (same way for the receiver).
If there is a need to limit quota then I suggest splitting the task and using some fibers for that only. At the end of the day there is already a fiber to accept each request. I don’t expect to be much different.
@jgaskins regarding the connection pool logic exposed, the creation of them are serialized. That is an unneeded penalty you are exposing to simultaneous clients. In crystal-db that is addressed by unlocking during the creation of the connection but counting the number of connection been created.