Fiber usage in high IO application

I have a small problem with Fibers in one of our projects when handling thousands of TCP connections where I have a reader and writer spawned for each TCP socket.
This ends up chewing up a lot of memory on stack space for 99% idle CPU.

Any ideas on how I could handle this?

Maybe something like a dispatch class where a single fiber waits on all the file descriptors for IO read
and then performs the read in a spawned fiber?
Question is, how would I push new connections into the pool? i.e. wake up fiber, handle fact that nothing is readable, add new connection and then wait for readable again

1 Like

That’s an interesting use case. I don’t see any immediate action to improve this, however there are a couple of angles we could consider.

A relatively simple hack would be to use smaller stacks for these kind of fibers. I suppose they are typically quite shallow and wouldn’t nearly need the 8MB default stack. Currently, this is not configurable. But it could be entirely possible to allow customization of fiber stacks.
This could be a consideration with the proposal of execution contexts (RFC #2).

Another angle for this kind of mass handling of identical sockets could indeed be an API that’s more like select and can wait on many sockets, handling all of them in the same way as they receive data.
The IO implementation would essentially be detached from Crystal’s main event loop.

1 Like

Configuring the default stack size and/or specifying the stack size for each fiber sounds good. We might want to drop Fiber::StackPool though, as having it handling too many stack sizes could factor out the benefits of recycling the stacks.

Ideally we’d calculate the stack size of any function, where recursive would be infinite, and allocate a perfectly sized stack, but
 can’t know the stack size of external calls (syscalls, C functions) :cry:

Could we start by allowing a spawn with an experimental runtime size of stack specified at call site to see if that works?

It could be monkey patched as long as Fiber.run is redefined to avoid sending the stack to the pool. Or having a custom stack pool for smaller fiber’s stacks.

If this proves useful we might want to make fiber’s stack pool an instance variable and allow passing the stack pool when spawning.

I’m not sure a fiber stack size customization will change much as the OS lazily allocates the memory - so it’s typically only 4kb of physical memory is being allocated per fiber.
Fiber stacks account for about half of the memory usage (~100mb)

Most of the memory usage is attributable to structures and classes managing each connection, but I figure this could be reduced greatly too if there was a way to mass handle sockets

I know 100mb doesn’t sound like much, but that represents a single client and one of many processes in the system - we do implement sharding so we can distribute the load across machines but it would be nice to constrain memory growth as much as possible

1 Like

Allocating virtual memory ain’t having much effect since it’s not physically allocated
 yet, there’s still an upper limit: RLIMIT_AS restricts how much virtual memory can be allocated by a single process.

But indeed, this isn’t your issue :+1:

What you describe sounds like introducing a type that would wrap epoll (linux), kqueue (bsd/mac) or WSAPoll (win).

1 Like

We might want to drop Fiber::StackPool though, as having it handling too many stack sizes could factor out the benefits of recycling the stacks.

Or perhaps have a separate stackpool per execution context. It seems likely that fibers in a context would have similar needs.

1 Like

Out of curiosity, is the following a good approximation of what you’re dealing with?

  • a server spawns 2 fibers for each open TCP socket
  • the TCP sockets - and hence the related fibers - are long-lived and in the thousands
  • the ulimit for the server is set to some very high number
require "socket"

def writer(client : TCPSocket, pipe : Channel(String))
  loop do
    message = client.gets
    pipe.send message || ""
  end
end

def reader(pipe : Channel(String))
  loop do
    message = pipe.receive
    puts "#{Fiber.current.name}: #{message}"
  end
end

server = TCPServer.new("localhost", 1234)

i = 0
while client = server.accept?
  pipe = Channel(String).new
  spawn writer(client, pipe)
  spawn(name: "r_#{i}") { reader(pipe) }
  i += 1
end

I’m asking because in that case, your idea of having a dispatcher spawn short-lived fibers on data received would probably halve the amount of allocated memory - assuming I got your scenario right :sweat_smile:

1 Like

Yep! This is exactly the scenario - would love to hear the idea :slight_smile:

This is an interesting use case for exposing the events and waiting parts of the event loop, where we don’t associate a specific fiber to one event (one shot) but instead have one waiting fiber for many events (regular wait/resume).

2 Likes

The equivalent GoLang library for something similar to this is: GitHub - panjf2000/gnet: 🚀 gnet is a high-performance, lightweight, non-blocking, event-driven networking framework written in pure Go./ gnet æ˜Żäž€äžȘé«˜æ€§èƒœă€èœ»é‡çș§ă€éžé˜»ćĄžçš„äș‹ä»¶é©±ćŠš Go çœ‘ç»œæĄ†æž¶ă€‚
I asked ChatGPT how to do something similar in crystal and it too mentioned leveraging libevent which crystal uses.

That said I really don’t like the gnet approach of performance over other considerations and think we can probably come up with something better. If we can provide memory and performance improvements without throwing out the rest of the std lib then that would be a big win

This is a relevant article: A Million WebSockets and Go

Here’s what I was thinking

require "socket"

def socket_manager(client : TCPSocket, pipe : Channel(String))
  loop do
    message = client.gets
    pipe.send message || ""
  end
end

def dispatcher(pipe : Channel(String))
  loop do
    message = pipe.receive
    spawn worker(message)
  end
end

def worker(message : String)
  sleep rand(10)
  puts "#{Fiber.current.name}: #{message}"
end
server = TCPServer.new("localhost", 1234)

pipe = Channel(String).new(capacity: 100)
spawn dispatcher(pipe)

while client = server.accept?
  spawn socket_manager(client, pipe)
end

Now, depending on the volume of messages, this could end up being more memory hungry than the previous implementation - because the number of ephemeral workers you spawn could be higher than the number of open sockets at any given time - but it could also pay off if

  1. workers are really short-lived
  2. message traffic is not so high as to lead to scenarios where the number of workers exceeds the number of TCP sockets

Other things to experiment with:

  • play with the capacity of the pipe channel: smaller values basically act as back-pressure mechanism to constrain the number of concurrent workers
  • if for any reason a single pipe channel appears to be a bottleneck, instantiate 2 or 3 at boot time and round robin their assignment to socket_manager
  • switch from ephemeral workers to a set number of workers defined at boot time - again, this will limit the number of messages that can be processed at once, but that’s not necessarily bad

Finally, you could consider solutions where a tcp socket pool is initialised - i.e. instead of initialising a single fiber per socket, you initialise a predefined number of socket managers. But this is where I think what @straight-shoota mentioned would be great to have:

Another angle for this kind of mass handling of identical sockets could indeed be an API that’s more like select and can wait on many sockets, handling all of them in the same way as they receive data.
The IO implementation would essentially be detached from Crystal’s main event loop.

Hope at least some of this is of help :grin:

4 Likes

Yeah it’s a good idea, for most of the time connections are idle so this will put a dent in the memory usage - thanks