According to htop, this pegs CPU usage at 400% while doing zero work:
# Run with:
# crystal run -Dpreview_mt threaded_workers.cr
def start_worker!
request_channel = Channel(String).new
Thread.new do
loop do
request = request_channel.receive
end
end
end
puts "Starting..."
4.times { start_worker! }
puts "Workers started. Sleeping until you press enter... (Check htop for how much CPU I'm using!)"
STDIN.gets
This is the simplest example as I can come up with right now. When I run this with Crystal 1.7.2 [29f9ac503] (2023-01-23) LLVM: 13.0.1 Default target: x86_64-unknown-linux-gnu, and run htop in another terminal, I see 400% CPU usage. I expect approximately 0% CPU usage.
(My actual use case is a ThreadedDNSResolver dedicated thread pool just for DNS resolution, calling Socket::Addrinfo.tcp(...) from each of the separate worker threads, using Channel to pass requests and responses back and forth to the main thread with CRYSTAL_WORKERS=1. This actually appears to be working, but it keeps running at 400% CPU consumption at idle with 4 Thread.new worker threads. I’ve distilled the issue down to this tiny example above.)
This seems to be related to Channel#receive, because if I replace that line with just sleep(1) then there is zero CPU usage. But not too useful
Any ideas? Am I doing something wrong or is this expected behavior? Thanks!
Yes, my reason is that the underlying LibC.getaddrinfo call blocks the entire thread, which would block any Fibers that would be capable of doing other work. My intention was that by creating new Threads only for DNS resolution, no ordinary Fibers would be blocked by this system call (except the one Fiber waiting on a Channel with the response).
Gotcha, I imagine that’s because C functions aren’t aware of the Crystal event loop. Maybe others will have some thoughts on this. E.g. if there’s a better way that doesn’t involve using a private API or something.
OK thanks. Maybe there is a solution involving select, or possibly just not using Channel#receive but finding another way to pass requests and results between threads. Will keep trying…
Surprisingly, my ThreadedDNSResolver.resolve seems to work just fine with substantial speedup over conventional Socket::Addrinfo.resolve, but unfortunately has this undesirable side effect of total CPU consumption at idle as illustrated here. Spin locks?
Thread is currently an internal API on purpose. It is not intended to be used in the way you do.
These manually created threads do not have a scheduler configured. This effectively means anything involving scheduling (e.g. receiving from a channel) probably won’t work as intended.
If you want a thread pool that’s used for blocking activities, the best bet is to get a dedicated thread pool. These activities shouldn’t block the normal worker threads either.
This is an implementation that never relies on Crystal’s scheduling and so it should work for you.
If you want another (possibly hacky) option, I use a socket to communicate between 2 brain halves of the same app in a loosely related problem.
UNIXSocket.open("/tmp/gpio_detector.sock") do |sock|
sock.puts "Event"
end
and then use normal crystal fibers to listen to the socket without troubles.
Probably not as clean, but I don’t think it’s too terrible a way to do it.
Looking at the source for Socket::Addrinfo#resolve I see that it is using LibC#getaddrinfo. I’m curious, what are you doing differently in your resolver that is running faster?
@nogginly Yes still using LibC#getaddrinfo, but (I think?) with multiple threads, so faster in terms of throughput, not latency of any one lookup.
@oprypin Very interesting, though it seems that it relies on Thread.join which would block the main thread which I don’t think I’d want in my use case.
@mavu This is interesting and may be a way to use a socket to communicate between the threads! Are you using the UNIXSocket to communicate within a single process, or between multiple processes? Is the sock.puts itself relying on IO::Evented which presumably relies on a scheduler?
@oprypin and @mavu 's two suggestions combined in my mind, and I wonderered if there’s a different way to communicate across the threads that doesn’t use the scheduler on the custom thread side, but does use the scheduler on the main thread side… This inspired me to try IO.pipe to communicate to/from the thread. From the special thread, only using LibC.read and LibC.write calls on the blocking Int32 #fd. On the main thread side, using ordinary #write/#gets to use evented IO on the non-blocking side of the pipes. This toy example appears to be working:
require "json"
thread_input_reader, thread_input_writer = IO.pipe(read_blocking: true, write_blocking: false)
thread_output_reader, thread_output_writer = IO.pipe(read_blocking: false, write_blocking: true)
th = Thread.new do
thread = Thread.current
input_fd = thread_input_reader.fd
output_fd = thread_output_writer.fd
bufsize = 64
buffer = Bytes.new(bufsize)
loop do
bytes_read = LibC.read(input_fd, buffer, bufsize)
break if bytes_read == 0 # EOF
message = String.new(buffer[0...bytes_read])
result = String.build do |str|
str << JSON.build do |json|
json.object do
json.field "thread", thread.object_id
json.field "message", message
end
end
str << "\n"
end
result_slice = result.to_slice
LibC.write(output_fd, result_slice, result_slice.size)
end
end
n = 1000000
start_time = Time.monotonic
n.times do |i|
line_sent = "Hello, world! #{i}"
thread_input_writer.write((line_sent + "\n").to_slice)
thread_input_writer.flush
line_received = thread_output_reader.gets.not_nil!
line_expected = "{\"thread\":#{th.object_id},\"message\":\"#{line_sent}\\n\"}"
if line_received != line_expected
puts "received=#{line_received}"
puts "expected=#{line_expected}"
raise "Doesn't match"
end
end
end_time = Time.monotonic
dt = (end_time - start_time).to_f
ips = n.to_f / dt
puts "n=#{n} dt=#{dt} IPS=#{ips}"
Regardless, I may drop the Thread approach and just run a pool of separate processes which do only name resolution, using Process.new and connecting to their stdin/stdout for simplicity.
Just to give an update: I built this out fully, having a ThreadedDNSResolver::Worker which sends a single byte over a pipe (as shown in the previous message) as a Channel-like mechanism from the main thread to tell the special worker thread when there’s new work available, and another single byte is sent from the special worker thread to tell the main thread that the result is available.
This may be a relatively simple way to get around the issue of thread-blocking system calls, for getaddrinfo or other cases?
ThreadedDNSResolver(max_threads=1): 8705.3 resolves per second
ThreadedDNSResolver(max_threads=10): 45731.6 resolves per second
The latter two used 20 spawn Fiber-based consumers, with CRYSTAL_WORKERS=1.
The overhead of the ThreadedDNSResolver with max_threads=1 is only a 1.2% loss in performance compared to the conventional thread-blocking loop. (However, even this would have the advantage of not blocking other Fibers in the main thread!)
When allowed to go up to 10 threads for simultaneous address resolution, I get 5.19x the throughput of name resolution. (And again, the advantage that other Fibers busy with HTTP::Client work for example will be unaffected.)
Will be deploying this soon for Heii On-Call as we add monitoring of your HTTP endpoints to the existing alerting/on-call rotation product. (Currently live but hidden behind a feature flag… let me know if you want to try it early )