Fiber usage in high IO application

Here’s what I was thinking

require "socket"

def socket_manager(client : TCPSocket, pipe : Channel(String))
  loop do
    message = client.gets
    pipe.send message || ""
  end
end

def dispatcher(pipe : Channel(String))
  loop do
    message = pipe.receive
    spawn worker(message)
  end
end

def worker(message : String)
  sleep rand(10)
  puts "#{Fiber.current.name}: #{message}"
end
server = TCPServer.new("localhost", 1234)

pipe = Channel(String).new(capacity: 100)
spawn dispatcher(pipe)

while client = server.accept?
  spawn socket_manager(client, pipe)
end

Now, depending on the volume of messages, this could end up being more memory hungry than the previous implementation - because the number of ephemeral workers you spawn could be higher than the number of open sockets at any given time - but it could also pay off if

  1. workers are really short-lived
  2. message traffic is not so high as to lead to scenarios where the number of workers exceeds the number of TCP sockets

Other things to experiment with:

  • play with the capacity of the pipe channel: smaller values basically act as back-pressure mechanism to constrain the number of concurrent workers
  • if for any reason a single pipe channel appears to be a bottleneck, instantiate 2 or 3 at boot time and round robin their assignment to socket_manager
  • switch from ephemeral workers to a set number of workers defined at boot time - again, this will limit the number of messages that can be processed at once, but that’s not necessarily bad

Finally, you could consider solutions where a tcp socket pool is initialised - i.e. instead of initialising a single fiber per socket, you initialise a predefined number of socket managers. But this is where I think what @straight-shoota mentioned would be great to have:

Another angle for this kind of mass handling of identical sockets could indeed be an API that’s more like select and can wait on many sockets, handling all of them in the same way as they receive data.
The IO implementation would essentially be detached from Crystal’s main event loop.

Hope at least some of this is of help :grin:

4 Likes