The Crystal Programming Language Forum

Channel.receive without blocking another thread

Ok it’s been hours of experimenting and I’m still no closer. I need someone like @asterite, @bew, or @straight-shoota who is more familiar with concurrency to help me out if you could. Basically this is the code giving me issues:

def broadcast(query, timeout = 10) forall U
  channel = Channel(Types::Base).new
  extra = Random.new.hex(16)
  query = query.to_h
    .transform_keys(&.to_s)
    .merge({"@extra" => extra})

  @update_manager << UpdateHandler(Types::Base).new(extra: extra, disposable: true) do |update|
    spawn channel.send(update)
  end

  API.client_send(@td_client, query)
  channel.receive
end

The full code for the relevant areas can be found here and here. Basically what I have are two, or more, threads. In one I have the update loop which is constantly checking for updates from the server and handling them via UpdateHandlers. The above method is intended to allow a “synchronous” send and receive. The @extra field is a a unique identifier allowing you to check for a matching response.

The problem is that the final channel.receive at the bottom seems to block the update thread, so it never receives because the send never happens. What I need is a way to receive without blocking that thread, or maybe I’m thinking about the problem the wrong way. Idk.

Maybe try to reduce the code to something that isn’t related to this domain? What I mean is, if you provide a snippet of, say, 100 lines of code that we can see and run altogether where you say what you want and why it’s not working as you want it to might be better for us to help you.

I’ve been trying to think of a minimal example of what I need. Let me see if I can come up with something.

Hopefully this illustrates what I’m trying to accomplish. On carc.in it won’t run, because it’s basically an infinite loop, but the idea is the same. I need an event loop which is constantly fetching new data, and I need to be able to add handlers that listen for specific things and return them, without stopping the loop. https://carc.in/#/r/84xg

Thank you!

To avoid blocking the run loop you just need to do:

def run
  spawn do
    loop do
      # ...
    end
  end
end

Then this:

spawn channel.send(i)

should simply be:

channel.send(i)

I don’t understand why you want to send something inside a separate spawn.

I don’t understand why you want to send something inside a separate spawn .

Just read something in the book that said channel.send was blocking and that it needed to be sent in its own fiber.

Anyway, what about making sure that spawn with the loop inside it gets executed? I’ve tried Fiber.yield, but it just exists after the first item gets sent, and I’ve tried sleep, and it works, but then the channel.receive ends up blocking it.

Yes, that makes sense, but it’s probably better to spawn an entire fiber to handle each job, then after it’s done do channel.send. That is, do all of that inside a spawn, otherwise the job handling will block the loop fiber.

The basic idea is to have a loop that spawns a fiber for each thing you want to process, and send something over a channel after each item is done. On another side (another spawn, or the main fiber) you do channel.receive to await completion of those jobs.

Doing all of this manually is pretty tedious, repetitive, boring and bug-prone. The standard library should provide abstractions for this but right now that’s not the case.

Since I never actually did something like the above I can’t comment much more about this, but maybe others already implemented similar things and can provide code, or maybe there’s a shard for this already.

1 Like

Thanks for your help :) I’ll see what I can do

Well it’s not perfect, but this is what I ended up with

def broadcast(return_type : U.class, query, timeout = 10) : Concurrent::Future(U) forall U
  Concurrent::Future(U).new do
    channel = Channel(U).new
    extra = Random.new.hex(16)
    query = query.to_h
      .transform_keys(&.to_s)
      .merge({"@extra" => extra})

    @update_manager << UpdateHandler(U).new(extra: extra, disposable: true) do |update|
      channel.send(update)
    end

    API.client_send(@td_client, query)
    channel.receive
  end
end

Still need to figure out the timeout part, but this seems to work. I just have to call the broadcast mehthod inside of a spawn, which isn’t ideal but will work for now