I’m currently implementing a daily import jobs against several APIs using Crystal (no need to write Java and save some resources, right )
While I would like to utilize concurrency and sending several requests at the same time, I do not want to overload the API endpoint and cap the number of concurrent requests. What would be a good approach to that?
The naive solution would be to have a list of items to process, shifting n elements of that list, send n fibers and use a channel to wait until all fibers have returned. That however means everything is dependent on the slowest fiber. Another idea would be to have block like
parallelize(10) do |e|
# HTTP client request here
end
This would require some more synchronization. Another idea could be one of the rate limiter shards, but most of them are based on time (which makes sense for rate limiting i.e. with a token bucket).
I suspect I am thinking too complex and there is something easier to do, so happy to get better ideas.
module Indexable(T)
def fiberpool_map(workers : Int, &func : T -> R) forall T, R
Array(R).build(self.size) do |result|
ch = Channel({Int32, T}).new
workers.times do
spawn do
while (work = ch.receive?)
i, inp = work
result[i] = func.call(inp)
end
end
end
self.each_with_index do |x, i|
ch.send({i, x})
end
ch.close
self.size
end
end
end
p (1..100).to_a.fiberpool_map(5, &.**(3))