Issue with crashes/hangs with fibers/channel and PG DB

First time posting here, so I tried to really make sure I could reproduce this with as little code as possible, which is what is here.

TL;DR: this fails in several ways, but only in those ways - or succeeds just fine, and I cannot figure out why

My expectation of course is that it would run fine or that it would crash the same way every time. The fact that it does a range of things (even accounting for slight differences in input) doesn’t seem to track with what I would expect.

This is from a much larger base of code that does monte carlo simulations, but I kept cutting away until it got down to this. I’ve sanitized it and some areas of the code are test cases where more code would fill them (e.g. instead of randomly filling a hash, a db call does it, etc).

I can run this many times in sequence and it always either does:

  1. runs totally fine (great!)

  2. hangs (the spot in the code where it outputs N/size with X and Y, it will show the X and then never show the Y - this is always between 480-490 of the 500). When it hangs, there are no db locks, there are no active db calls, there is no CPU load, and I can leave it overnight and still nothing happens. It feels like maybe a thread deadlock? I have no idea.

  3. crashes with (the number varies, but the text does not):

    Duplicate large block deallocation

    [2] 33314 abort ./test

  4. crashes with something like this - the length always varies (this is a short one) but it always has three “Invalid memory access (signal 11) at address 0x8” at the top, and it is always that exact text, and always 0x8. Sometimes these are much larger, this is the smallest I have seen.

    Invalid memory access (signal 11) at address 0x8
    Invalid memory access (signal 11) at address 0x8
    Invalid memory access (signal 11) at address 0x8
    [0x109c292e6] *Exception::CallStack::print_backtrace:Int32 +38
    [0x109bafd75] __crystal_sigfault_handler +309
    [0x7fff2039ad7d] _sigtramp +29
    [0x109c0ba5c] *DB::PoolPreparedStatement#build_statement:DB::Statement+ +108
    [0x109c263c8] ~procProc(Nil)@test.cr:47 +1288
    [0x109baa4bc] *Fiber#run:(IO::FileDescriptor | Nil) +60

I build with this:
crystal build -Dpreview_mt --no-debug --release test.cr

The shard versions:
db (0.10.0)
pg (0.23.1)
dotenv (0.3.0)

crystal -v
Crystal 0.36.1 (2021-02-02)

LLVM: 11.0.1
Default target: x86_64-apple-macosx

I haven’t recently tested this on Ubuntu, but a week or two ago it was doing this on my Linux box as well. My Mac has 8 cores and 32GB RAM, Linux has 64 cores and 192GB RAM. I have tried to watch using htop on both and never seen it get close to saturating even 4 cores and it doesn’t (on there) show a RAM issue on either.

I have full control of the DB so can adjust any parameters on that, but I see no sign of it being stressed during any of this.

I have played with the parameters in the connection string and nothing seems to help this.

Probably stating the obvious, but to show the DB part is relevant and the resources involved with that, if I call boo instead of woo it always returns in in full in less than a second, every time, always - even though the numeric output of it is the same.

Code:

require "dotenv"
require "db"
require "pg"

Dotenv.load

DB_CONNECTION_STRING = "postgres://#{ENV["DB_USERNAME"]}:#{ENV["DB_PASSWORD"]}@#{ENV["DB_HOST"]}:#{ENV["DB_PORT"]}/#{ENV["DB_NAME"]}?max_pool_size=200&initial_pool_size=200&max_idle_pool_size=200&retry_attemps=30&checkout_timeout=60"     
TEST_DB = DB.open(DB_CONNECTION_STRING)

class A

  def go
    numbers_hash = Hash(Int32, Int32).new
    while numbers_hash.size < 500
      x = (3..12056).sample
      numbers_hash[x] = x * 10
    end
    channel = Channel(UInt8).new
    numbers_hash.each do |a, b|
      spawn do
        B.new.woo(a, b)
        channel.send 1_u8
      end
    end
    counter = 0_u64
    numbers_hash.each {
      puts "X: #{counter += 1}/#{numbers_hash.size}"
      channel.receive
      puts "Y: #{counter}/#{numbers_hash.size}"
    }
  end

end

class B

  def boo(a,b)
    test_count = {0,1}.sample
    1_u8
  end

  def woo(a, b)     
    # this SQL only ever returns 0 or 1
    test_sql = "select count(*)::smallint as example_count from my_table where id=$1;"
    test_count = TEST_DB.query_one test_sql, a, as: {Int16}
    1_u8     
  end

end

A.new.go

The table it is querying is ~20k entries and the range of values is mimicked by the random hash builder, so could build one that way.
Although after testing, this SQL will also do it (so if boo used this instead, it crashes the same as woo):

test_sql = "SELECT cast(random()*(1-0)+0 as int);"
test_count = TEST_DB.query_one test_sql, as: {Int32}

As far as I understand, DB.open returns a Database instance - which wraps a connection pool. I don’t think the instance is thread-safe, when running in preview_mt mode.

If you look at the docs, you’ll see they talk about “checking out” a connection from the pool via DB#checkout or DB#using_connection.

DB.open "mysql://root:root@localhost/test" do |db|
  db.using_connection do |cnn|
    puts typeof(cnn)
    # use cnn
  end
end

Although there is no explicit mention of thread-safety here, I can see comments about “multiple fibers waiting for a resource” in the source code.

So, my first shot at this would be to wrap queries into using_connection blocks, and see if that fixes the issue :crossed_fingers:

2 Likes

Can you still reproduce it if you use the darwin.tar.gz release from GitHub? After downloading and decompressing it you can call bin/crystal to use that version of the compiler.

1 Like

crystal-db should be thread safe, even when you execute a query directly on the Database instance. The execution logic should internally check out a connection from the pool. Checking out a connection explicitly is mostly useful if you want to run a series of queries in the same scope and re-use the same connection to reduce stress on the pool.
Still might be something going wrong here. And it’s probably not explicitly tested with multithreading.

2 Likes

Downloaded the latest, ran into the openssl thing - did brew install on that, already have it - linked pkg-config - then it would run. First time hung (but earlier than usual - in the 300s rather the late 400s), next time did the “duplicate large allocation abort”. That was with the actual code.

Doing the same with the test code as seen here and using the random SQL (boo) the first time it hung in the 400s, and the next time it did the crash with the invalid memory access, signal 11, address 0x8 (3 times).

So yes, can reproduce it with that zip.

I took this:

  def boo(a,b)
    test_sql = "SELECT cast(random()*(1-0)+0 as int);"
    test_count = TEST_DB.query_one test_sql, as: {Int32}
    1_u8      
  end

and switched it to (still in the same context as previously of course):

  def boo(a,b)
    test_sql = "SELECT cast(random()*(1-0)+0 as int);"
    TEST_DB.using_connection do |cnn|
      test_count = cnn.query_one test_sql, as: {Int32}
    end    
    1_u8      
  end

(assuming that is correct?)
Using crystal from the darwin zip and this code, tried it 10 times and it hangs every time the late 400s but I haven’t seen the other crash types (and also haven’t seen a success).

Using the brew crystal I had been using before and this adjustment, exact same behavior (hangs, but not success or crashes).

Progress?

I think I got it: the PoolPreparedStatement is not thread-safe.

It keeps a set of connections where the command has been prepared. But the access to that set is not thread safe.

Can you try with &prepared_statements=false as part of the connection string? That would not use prepared statements by default and might unblock you

2 Likes

Gave that a shot and I remain confused (all of these are using that at the end of the connection string):

Shortcut example just using random but still hitting the DB, with “using_connection” - still hangs every time (no crash).
Shortcut example just using random but still hitting the DB, with straight DB.query_one - works about half of the time and runs 2x as slow, the other half hangs. Doesn’t seem to crash?

Full example actually counting from real table, with “using_connection” - still hangs every time (no crash).
Full example actually counting from real table, with straight DB.query_one - works about 25% of the time, but runs just as fast (and faster than the random above that never looks at a table). Doesn’t seem to crash?

(ran it 10 times for each of those, which in the past was enough to get a range of crash/hangs)

Not sure that’s intuitive to me.

Is this the sort of thing like “this is a you problem, change your code to only be single threaded with DB” and I am just doing it wrong?
Or is this like “seems wrong, will be fixed by 2022, probably not sooner”?
Or something more pressing and I should report it? (and if so, is that on the PG shard?)

Any of those are fine, I just am not sure I understand where it falls to get a feel of how much code I should modify and/or use another language.

Thank you!

PoolPreparedStatement is a crystal-db feature and it’s definitely supposed to be thread-safe. Please post a bug report here: Issues · crystal-lang/crystal-db · GitHub

1 Like

There is something more bothering here. And I don’t know yet. I added a mutex in the pool prepared statement at GitHub - crystal-lang/crystal-db at fix/pool-prepared-statement but some hangs are still happening.

@omgponies I don’t think this is a “is your problem” situation, but there are some things than can be done slightly different in your code that would make the issue go away probably.

In you example you are spawning a new fiber per each entry in the hash. Each of this fiber will try to consume a connection from the pool. So to control how many items you are processing concurrently you will need to tweak the connection pool directly. What is worst is that if the processing takes time, another awaiting fiber could timeout for waiting the connection. Not ideal.

I would suggest to decouple the enqueue of jobs / items to process with workers. You can choose how many workers to have independently of how many items you have to process. Each worker will use one connection of the pool with less chances of timing out (unless the pool size is smaller that the workers count). This will also allow you to share the pool for other uses.

Without changing too much your code, this can be achieved with

WORKERS              =  5
FIXED_POOL_SIZE      = 10
DB_CONNECTION_STRING = "postgres://postgres@localhost/crystal?max_pool_size=#{FIXED_POOL_SIZE}&initial_pool_size=#{FIXED_POOL_SIZE}&max_idle_pool_size=#{FIXED_POOL_SIZE}&retry_attemps=30&checkout_timeout=60"
TEST_DB              = DB.open(DB_CONNECTION_STRING)

TEST_DB.exec "drop table if exists my_table"
TEST_DB.exec "CREATE TABLE my_table (id int)"
TEST_DB.exec "INSERT INTO my_table (id) VALUES (1)"
TEST_DB.exec "INSERT INTO my_table (id) VALUES (2)"

class A
  def go
    numbers_hash = Hash(Int32, Int32).new
    while numbers_hash.size < 500
      x = (3..12056).sample
      numbers_hash[x] = x * 10
    end

    jobs = Channel({a: Int32, b: Int32}).new(WORKERS)
    channel = Channel(UInt8).new(numbers_hash.size)

    WORKERS.times do
      spawn do
        while (job = jobs.receive)
          B.new.woo(job[:a], job[:b])
          channel.send 1_u8
        end
      end
    end

    numbers_hash.each do |a, b|
      jobs.send({a: a, b: b})
    end

    counter = 0_u64
    numbers_hash.each {
      puts "X: #{counter += 1}/#{numbers_hash.size}"
      channel.receive
      puts "Y: #{counter}/#{numbers_hash.size}"
    }
  end
end

class B
  def boo(a, b)
    test_count = {0, 1}.sample
    1_u8
  end

  def woo(a, b)
    # this SQL only ever returns 0 or 1
    test_sql = "select count(*)::smallint as example_count from my_table where id=$1;"
    test_count = TEST_DB.query_one test_sql, a, as: {Int16}
    1_u8
  end
end

A.new.go

The channel channel needs to be buffered because otherwise the channel.send 1_u8 will stuck.

1 Like

Thank you so much! I will implement this in my actual code tomorrow.
The feedback is very much appreciated.