There is something more bothering here. And I don’t know yet. I added a mutex in the pool prepared statement at GitHub - crystal-lang/crystal-db at fix/pool-prepared-statement but some hangs are still happening.
@omgponies I don’t think this is a “is your problem” situation, but there are some things than can be done slightly different in your code that would make the issue go away probably.
In you example you are spawning a new fiber per each entry in the hash. Each of this fiber will try to consume a connection from the pool. So to control how many items you are processing concurrently you will need to tweak the connection pool directly. What is worst is that if the processing takes time, another awaiting fiber could timeout for waiting the connection. Not ideal.
I would suggest to decouple the enqueue of jobs / items to process with workers. You can choose how many workers to have independently of how many items you have to process. Each worker will use one connection of the pool with less chances of timing out (unless the pool size is smaller that the workers count). This will also allow you to share the pool for other uses.
Without changing too much your code, this can be achieved with
WORKERS = 5
FIXED_POOL_SIZE = 10
DB_CONNECTION_STRING = "postgres://postgres@localhost/crystal?max_pool_size=#{FIXED_POOL_SIZE}&initial_pool_size=#{FIXED_POOL_SIZE}&max_idle_pool_size=#{FIXED_POOL_SIZE}&retry_attemps=30&checkout_timeout=60"
TEST_DB = DB.open(DB_CONNECTION_STRING)
TEST_DB.exec "drop table if exists my_table"
TEST_DB.exec "CREATE TABLE my_table (id int)"
TEST_DB.exec "INSERT INTO my_table (id) VALUES (1)"
TEST_DB.exec "INSERT INTO my_table (id) VALUES (2)"
class A
def go
numbers_hash = Hash(Int32, Int32).new
while numbers_hash.size < 500
x = (3..12056).sample
numbers_hash[x] = x * 10
end
jobs = Channel({a: Int32, b: Int32}).new(WORKERS)
channel = Channel(UInt8).new(numbers_hash.size)
WORKERS.times do
spawn do
while (job = jobs.receive)
B.new.woo(job[:a], job[:b])
channel.send 1_u8
end
end
end
numbers_hash.each do |a, b|
jobs.send({a: a, b: b})
end
counter = 0_u64
numbers_hash.each {
puts "X: #{counter += 1}/#{numbers_hash.size}"
channel.receive
puts "Y: #{counter}/#{numbers_hash.size}"
}
end
end
class B
def boo(a, b)
test_count = {0, 1}.sample
1_u8
end
def woo(a, b)
# this SQL only ever returns 0 or 1
test_sql = "select count(*)::smallint as example_count from my_table where id=$1;"
test_count = TEST_DB.query_one test_sql, a, as: {Int16}
1_u8
end
end
A.new.go
The channel
channel needs to be buffered because otherwise the channel.send 1_u8
will stuck.