Issue with crashes/hangs with fibers/channel and PG DB

There is something more bothering here. And I don’t know yet. I added a mutex in the pool prepared statement at GitHub - crystal-lang/crystal-db at fix/pool-prepared-statement but some hangs are still happening.

@omgponies I don’t think this is a “is your problem” situation, but there are some things than can be done slightly different in your code that would make the issue go away probably.

In you example you are spawning a new fiber per each entry in the hash. Each of this fiber will try to consume a connection from the pool. So to control how many items you are processing concurrently you will need to tweak the connection pool directly. What is worst is that if the processing takes time, another awaiting fiber could timeout for waiting the connection. Not ideal.

I would suggest to decouple the enqueue of jobs / items to process with workers. You can choose how many workers to have independently of how many items you have to process. Each worker will use one connection of the pool with less chances of timing out (unless the pool size is smaller that the workers count). This will also allow you to share the pool for other uses.

Without changing too much your code, this can be achieved with

WORKERS              =  5
FIXED_POOL_SIZE      = 10
DB_CONNECTION_STRING = "postgres://postgres@localhost/crystal?max_pool_size=#{FIXED_POOL_SIZE}&initial_pool_size=#{FIXED_POOL_SIZE}&max_idle_pool_size=#{FIXED_POOL_SIZE}&retry_attemps=30&checkout_timeout=60"
TEST_DB              = DB.open(DB_CONNECTION_STRING)

TEST_DB.exec "drop table if exists my_table"
TEST_DB.exec "CREATE TABLE my_table (id int)"
TEST_DB.exec "INSERT INTO my_table (id) VALUES (1)"
TEST_DB.exec "INSERT INTO my_table (id) VALUES (2)"

class A
  def go
    numbers_hash = Hash(Int32, Int32).new
    while numbers_hash.size < 500
      x = (3..12056).sample
      numbers_hash[x] = x * 10
    end

    jobs = Channel({a: Int32, b: Int32}).new(WORKERS)
    channel = Channel(UInt8).new(numbers_hash.size)

    WORKERS.times do
      spawn do
        while (job = jobs.receive)
          B.new.woo(job[:a], job[:b])
          channel.send 1_u8
        end
      end
    end

    numbers_hash.each do |a, b|
      jobs.send({a: a, b: b})
    end

    counter = 0_u64
    numbers_hash.each {
      puts "X: #{counter += 1}/#{numbers_hash.size}"
      channel.receive
      puts "Y: #{counter}/#{numbers_hash.size}"
    }
  end
end

class B
  def boo(a, b)
    test_count = {0, 1}.sample
    1_u8
  end

  def woo(a, b)
    # this SQL only ever returns 0 or 1
    test_sql = "select count(*)::smallint as example_count from my_table where id=$1;"
    test_count = TEST_DB.query_one test_sql, a, as: {Int16}
    1_u8
  end
end

A.new.go

The channel channel needs to be buffered because otherwise the channel.send 1_u8 will stuck.

1 Like