Trying to understand crystal concurrency

So I see some websocket examples using kemal, but it seems like thread safety is being ignored in the example…

require “kemal”

SOCKETS = [] of HTTP::WebSocket

ws “/chat” do |socket|
  # Add the client to SOCKETS list
  SOCKETS << socket  

  # Broadcast each message to all clients
  socket.on_message do |message|
    SOCKETS.each { |socket| socket.send message}
  end  

 # Remove clients from the list when it’s closed
 socket.on_close do
   SOCKETS.delete socket
 end
end

Kemal.run

This may be a kemal related question, but I can’t visualize what’s being run on which fiber. My guess is that each ws connection gets its own fiber, and the call backs are run in those fibers. Or there is a short running fiber for each connection start, message receive, and connection close.

But either way, all the fibers can add and remove sockets to the global socket list, so it seems that its only working because I don’t have multi-threaded option set when running the program.

Also, I’m attempting to set up a system where a channel accumulates messages from web socket connections, then another fiber empties out that message list, then does some more work before coming back to the start of the loop, but when the messages are done, the fiber blocks. I’ve read i can put that message handler on a separate fiber, but then those fibers would be accessing the same memory.

Coming from C++, I am enjoying how fast I can get things up and running in crystal, and hope it keeps gaining its momentum!

Hi eatjason! Welcome to the forums.

Multithreading was released experimentally just a couple of minor versions ago. The Kemal example was written without mutlithreading in mind. Like Kemal, all existing programs might need to be reviewed to either use a mutex for shared data or communicate it using channels.

In the ws I can imagine the block runs in a separate fiber, and so SOCKETS must be synchronized with a mutex.

1 Like

Thanks for the reply! I’m really enjoying crystal at the moment, uploading a crystal app to heroku was so easy I was able to get it up and running on the web already (click one of the “Game Room links”).

http://eatjason-2.gigalixirapp.com/projects/pages/webgames

I was able to make a mutex-less solution for this problem, I find mutexes clutter up code and create a strong coupling between objects and threads. I have some elixir experience, so I basically made 2 GenServers. One for the event handler, and one for the game state. The event handler has a single fiber to access its data, and all changes from outside come from a channel event. The game object is the only objec that accesses its own data, with the exception of some events that need to modify from the event handler. In those cases the game fiber just waits to recieve the all clear message on its channel after the event handler finishes modify the game data (only needs to do it twice a frame)

One of the main things I do not understand about this code is my use of playerId in the socket callbacks. I wasn’t sure how else to have those call backs access which slot id they were associated with. It appears to capture the value it was when the callback is defined for the websocket, and stays there even after the loop is done.

# # TODO: Write documentation for `Gameserver`
# module Gameserver
#   VERSION = "0.1.0"

#   # TODO: Put your code here
# end

# 2d vector class
struct Vector
  property x : Float32
  property y : Float32

  def +(other)
    Vector.new(x + other.x, y + other.y)
  end

  def initialize(@x : Float32, @y : Float32)
    
  end
end

# player commands (will include buttons and other actions later)
struct Command
  property type : UInt8 = 0     # this will be used to add and remove player units as players join and leave
  property pos : Vector = Vector.new(0,0)
end

# Events for event handler to process
struct Event

  enum Type
    None = 0
    Join
    Leave
    Update
    Swap
    Broadcast
  end

  property type : Type
  property socket : HTTP::WebSocket?
  property id : Int32
  property command : Command

  def initialize(@type : Type, @socket : HTTP::WebSocket?, @id : Int32, @command : Command)
  
  end

  def self.join(socket : HTTP::WebSocket?)
    Event.new Type::Join, socket, -1, Command.new
  end

  def self.leave(id : Int32)
    Event.new Type::Leave, nil, id, Command.new
  end

  def self.update(id : Int32, command : Command)
    Event.new Type::Update, nil, id, command
  end

  def self.swap
    Event.new Type::Swap, nil, -1, Command.new
  end

  def self.broadcast
    Event.new Type::Broadcast, nil, -1, Command.new
  end

end

# main event handler, this is the only class that accesses its data
class EventHandler
  # this is a direct reference to the game state, only used when game signals
  # event handler its ok to modify its state
  property game : Game?

  property channel : Channel(Event)           # command channel
  property sockets : Array(HTTP::WebSocket?)  # player sockets
  property commands : Array(Command)          # accumulated commands for frame
  
  # dispatch an event to the event handler
  def dispatch(event : Event)
    @channel.send(event)
  end

  def initialize(players : Int32)

    # set up inital data for handler
    @game = nil
    @channel = Channel(Event).new

    @sockets = Array(HTTP::WebSocket?).new players
    @commands = Array(Command).new players
    (1..players).each { 
      @sockets << nil
      @commands << Command.new
    }

    # this is the main event loop
    spawn do
      loop do
        event = @channel.receive()
        
        case event.type
        # dispatched when a new player joins a session
        when Event::Type::Join
          
          # finds an empty slot for player
          playerId = -1
          @sockets.each_with_index { |socket, i| playerId = i if !socket }
          
          # if a player slot is open
          if playerId != -1

            # socket should never be nil here
            if event.socket
              # Broadcast each message to all clients
              # According to the below link, I would think I wouldn't need the .as(HTTP::WebSocket) here...
              # https://crystal-lang.org/reference/syntax_and_semantics/if_var.html
              event.socket.as(HTTP::WebSocket).on_message do |message|
                # converts message to command struct
                list = message.split(' ');
    
                cmd = Command.new
                cmd.pos = Vector.new(list[0].to_f32, list[1].to_f32)
            
                # *NO IDEA HOW THIS WORKS...
                # somehow the local value of playerId is captured
                # by the fiber that uses this callback?
                @channel.send( Event.update playerId, cmd )

              end  

              # Remove clients from the list when it’s closed
              event.socket.as(HTTP::WebSocket).on_close do
                # Same playerId used in this callback
                @channel.send( Event.leave playerId )
              end

              # playerId index set to event.socket
              sockets[playerId] = event.socket
            end
          
          else
            # we close the connection here because there were no slots left
            event.socket.as(HTTP::WebSocket).close
          
          end

        # dispatched when a player leaves a session
        when Event::Type::Leave
          # close the socket and make it nil so new players can join  
          if sockets[event.id]
            sockets[event.id].as(HTTP::WebSocket).close
            sockets[event.id] = nil
          end

        # dispatched every time a player sends a message
        when Event::Type::Update
          # simply set the command buffer with event commnad
          commands[event.id] = event.command

        # dispatched when game loop is ready to run a new frame
        when Event::Type::Swap
          # all this does is swap the handler commands and game commands
          # so that the game can use what has been building up 
          # in the handler for the next frame
          game = @game.as(Game)
          temp = @commands
          @commands = game.commands
          game.commands = temp
          # this signals the game fiber that we are done messing with the game state
          # and allow the game to continue its loop
          game.channel.send(true)

        when Event::Type::Broadcast
          # broadcast game frame data to all active sockets
          game = @game.as(Game)
          @sockets.each { |socket| 
            if socket 
              socket.as(HTTP::WebSocket).send(game.outBuffer) 
            end
          }
          # this signals the game fiber that we are done sending off the new frame data to clients
          # and ready to contiue its loop
          game.channel.send(true)
        
        else
          # unknown command

        end
    
        
      end  
    end
  
  end


end


# this is the game state class, only the main game loop can access its
# data except for when it signals the handler that it will stop
# touching its memory for the handler to do things with it
# this only happens and the begining of the loop for input,
# and the end of the loop for output
class Game
  # game event handler, only safe function to call here is 'dispatch'
  property handler : EventHandler

  # these are the only two pieces of data that the handler ever touches
  property commands : Array(Command)  # command array for game to access and update the frame
  property outBuffer : String         # buffer for data to be send to all sockets

  property channel : Channel(Bool)    # channel for recieving complete signal from handler

  # these will be a series of arrays that will be entity components
  property playerPos : Array(Vector)  # current player positions

  def initialize(players : Int32)
    # initalize game data
    @handler = EventHandler.new players

    @channel = Channel(Bool).new

    rnd = Random.new

    @playerPos = Array(Vector).new players
    @commands = Array(Command).new players
    (1..players).each {
      @playerPos << Vector.new((rnd.rand(600) + 100).to_f32, (rnd.rand(400) + 100).to_f32)
      @commands << Command.new
    }

    @outBuffer = ""

    # this is the main game loop
    spawn do
      loop do
        # measure the frame execution so lock framerate at 30 fps
        elapsed_time = Time.measure do
          # first thing we do is dispatch a swap event to swap command buffers
          # so we can freely work with commands in game loop, and 
          # simultaneously allow the handler to assemble commands for next frame
          @handler.dispatch( Event.swap )
          @channel.receive()

          # update all player positions, and constrain motion to play field
          @commands.each_with_index { |command, i| 
            v = @playerPos[i] + command.pos
            v.x = 0   if v.x < 0
            v.x = 800 if v.x > 800
            v.y = 0   if v.y < 0
            v.y = 600 if v.y > 600
            @playerPos[i] = v
          }
  
          # assemble frame
          @outBuffer = String.build do |str|
            @playerPos.each { |p| str << "#{p.x} #{p.y} " }  
          end

          # send frame to all sockets
          @handler.dispatch( Event.broadcast )
          @channel.receive()

        end
        delay_time = 33.millisecond - elapsed_time # sleep for the remaining milliseconds to meet 30 fps
        sleep delay_time
      end
    end # this is the end for the spawning of the game loop

    # have to set handler game down here now that self is ready with all instance values set
    handler.game = self

  end

end

# now that my data is organized, I can spawn as many game
# instances as I want without worrying about race conditions
MAX_PLAYERS = 10
GAMES = [
  Game.new(MAX_PLAYERS),
  Game.new(MAX_PLAYERS),
  Game.new(MAX_PLAYERS)
]


require "kemal"

ws "/game/:id" do |socket, context|
  id = context.ws_route_lookup.params["id"].to_i32
  if id >= 0 && id < GAMES.size
    # send join message to event handler of game that matches game room id
    GAMES[id].handler.dispatch( Event.join socket )
  end
end



Kemal.run

One of the main things I do not understand about this code is my use of playerId in the socket callbacks. I wasn’t sure how else to have those call backs access which slot id they were associated with. It appears to capture the value it was when the callback is defined for the websocket, and stays there even after the loop is done.

Yes. When a local variable is used in a captured block, in this case the one for on_message, then it becomes closured. Here’s an explanation of closures in Wikipedia. This means the local variable will survive after the method (in this case initialize) finishes. The way to implement this is by allocating the variable in the heap, and having the block have a reference to it, but this is an implementation detail.

However, because you are sharing playerId inside different blocks, and these might potentially run inside fibers, you have a bug in your code. This will eventually result in a data race. The way to avoid this is to protect the variable with a mutex, or code the app in a different way using channels (I didn’t think about how, but that’s the gist of the problem).

I hope this helps!

Thanks! It does!

I modified it a bit, now there’s a class that encapsulates the id and the socket. I imagine the whole instance is getting captured in this case.

I’m not sure the previous code had a race condition though, because playerId never changes after it gets captured… since it goes out of scope after every event loop, nothing would be able to change it right? I mean each playerId captured goes into a new location on the heap, its not like each join would be using the same heap variable or the code wouldn’t work at all.

Anyway, this way ensures no new variables magically go on the heap. And I think the captured instance of SocketData only is captured to the two callbacks on_message and on_close

class SocketData
  property id : Int32
  property handler : EventHandler
  property socket : HTTP::WebSocket?

  def initialize(@id : Int32, @handler : EventHandler)
    @socket = nil
  end

  def is_open
    @socket != nil
  end

  def send(msg)
    if @socket
      @socket.as(HTTP::WebSocket).send(msg)
    end
  end

  def set_socket(new_socket : HTTP::WebSocket?)
    if @socket
      @socket.as(HTTP::WebSocket).close
      @socket = nil
    end

    if new_socket

      new_socket.as(HTTP::WebSocket).on_message do |message|
        # converts message to command struct
        list = message.split(' ');

        cmd = Command.new
        cmd.pos = Vector.new(list[0].to_f32, list[1].to_f32)
    
        # *NO IDEA HOW THIS WORKS...
        # somehow the local value of playerId is captured
        # by the fiber that uses this callback?
        @handler.dispatch( Event.update id, cmd )

      end  

      # Remove clients from the list when it’s closed
      new_socket.as(HTTP::WebSocket).on_close do
        # Same playerId used in this callback
        @handler.dispatch( Event.leave id )
      end

    end

    @socket = new_socket

  end
  

end


# main event handler, this is the only class that accesses its data
class EventHandler
  # this is a direct reference to the game state, only used when game signals
  # event handler its ok to modify its state
  property game : Game?

  property channel : Channel(Event)           # command channel
  property sockets : Array(SocketData)      # player sockets
  property commands : Array(Command)          # accumulated commands for frame
  
  # dispatch an event to the event handler
  def dispatch(event : Event)
    @channel.send(event)
  end

  def initialize(players : Int32)

    # set up inital data for handler
    @game = nil
    @channel = Channel(Event).new

    @sockets = Array(SocketData).new players
    @commands = Array(Command).new players
    
    
    ( 0..(players-1) ).each { |i| 
      @sockets << SocketData.new i, self
      @commands << Command.new
    }

    # this is the main event loop
    spawn do
      loop do
        event = @channel.receive()
        
        case event.type
        # dispatched when a new player joins a session
        when Event::Type::Join
          
          # finds an empty slot for player
          playerId = -1
          @sockets.each_with_index { |socket, i| playerId = i if !socket.is_open }
          
          # if a player slot is open
          if playerId != -1

            # socket should never be nil here
            if event.socket
              # According to the below link, I would think I wouldn't need the .as(HTTP::WebSocket) here...
              # https://crystal-lang.org/reference/syntax_and_semantics/if_var.html
              
              # playerId index set to event.socket
              sockets[playerId].set_socket(event.socket)
            end
          
          else
            # we close the connection here because there were no slots left
            event.socket.as(HTTP::WebSocket).close
          
          end

        # dispatched when a player leaves a session
        when Event::Type::Leave
          # close the socket and make it nil so new players can join  
          sockets[event.id].set_socket(nil)  

        # dispatched every time a player sends a message
        when Event::Type::Update
          # simply set the command buffer with event commnad
          commands[event.id] = event.command

        # dispatched when game loop is ready to run a new frame
        when Event::Type::Swap
          # all this does is swap the handler commands and game commands
          # so that the game can use what has been building up 
          # in the handler for the next frame
          game = @game.as(Game)
          temp = @commands
          @commands = game.commands
          game.commands = temp
          # this signals the game fiber that we are done messing with the game state
          # and allow the game to continue its loop
          game.channel.send(true)

        when Event::Type::Broadcast
          # broadcast game frame data to all active sockets
          game = @game.as(Game)
          @sockets.each { |socket| socket.send(game.outBuffer) }
          # this signals the game fiber that we are done sending off the new frame data to clients
          # and ready to contiue its loop
          game.channel.send(true)
        
        else
          # unknown command

        end
    
        
      end  
    end
  
  end


end

I think you might be right, I’m not sure. It’s too much code for me to review in the forums (lack of time ando also don’t know where to specifically look for issues).

No problem, thanks for the insights though.

So I think you are right about libraries needing to be reviewed, I don’t think its possible to get the original example work (forget about the previous code in the post) without library changes.

Objects like HTTP::WebSockets right now can’t work with parallelism because the main event loop fiber accesses objects that need to be accessed when sending a message. So it’s not safe to use the send method outside of the event loop at all at the moment, not for sending messages to all sockets, or sending messages at a time tick rate.

So I guess crystal libraries (or at least 3rd party libraries, I think HTTP::WebSocket is kemal?) have a ways to go before mutli-threading will be safe?

If mutexes are added to HTTP::WebSocket, they should be put there as part of the class, I shouldn’t have to think about them. I like the idea of making a class that has a running fiber act as more of an Elixir gen server, where all data is private, and the send function will do nothing but dispatch a message to the event fiber and actually dispatch the message.

This method would have a lot of potential, because in Elixir all messages are copied, where as crystal you could get away with creating a message in memory, then just transferring the ownership to the receiving fiber channel.

Right now its hard to know what fibers are doing what… and if they are thread safe or not. Is there a way to profile them while the program is running?

Here’s the code snippet with comments on some of the problems. (last bit of code I will post on the subject for now)

    require “kemal”

    SOCKETS = [] of HTTP::WebSocket

    ws “/chat” do |socket|
        # Add the client to SOCKETS list
        # could mutex here to protect the array of sockets
        # should be noted that I don't know if the ws chat function
        # is running on the same fiber as the socket callback
        # extreamly important piece of information
        # because calling functions on socket anywhere but
        # the event fiber its running on is a potential 
        # race condition
        SOCKETS << socket  

        # Broadcast each message to all clients
        socket.on_message do |message|
            # so I don't thing this can be fixed with a mutex...
            # because each socket is running it's own fiber,
            # and could be performing operations in the socket
            # at the same time we try and send these messages.
            # And since the other operations don't already
            # have mutexes on them, locking a mutex
            # wouldn't help because the other operations
            # in the socket fiber don't use mutexes.
            # So I have to conclude that its not safe to 
            # call any socket function from ANYWHERE but
            # the fiber that is running the message callback
            # for the socket.
            SOCKETS.each { |socket| socket.send message}
        end  

        # Remove clients from the list when it’s closed
        socket.on_close do
            # also could mutex here to protect the array of sockets
            SOCKETS.delete socket
        end
    end

    Kemal.run

EDIT: Actually I don’t have a lot of experience with concurrent languages at the moment, but I noticed the way gorrilla websockets in go work by making sure only one goroutine writes, and only one goroutine reads. This allows two routines for reading and writing defined by the app. I like this approach as it allows for more control over what runs where. Anyway, I’m not sure a good way to implement sockets, seems like it would depend on the app whether you want channels or mutexes. I like the idea of making fiber creation explicit to the app using it, hidden fibers seem a bit dangerous.

I was able to narrow down the information I need to make this work properly, but I think I’ll have to wait to use Crystal until there’s some better documentation and libraries are updated to support concurrency. The websocket class is implemented with the HTTP::Websocket::Protocol class. This class does not use any kind of locks, and the receive and send member functions access and modify class members. This means that SOCKETS.each { |socket| socket.send message} in the above is definatly not threadsafe, since every socket fiber could try sending packets at the same time to all clients.

I suppose you could just mutex everything, but that doesn’t seem very elegant. In go the documentation is clear that you can only receive on one goroutine, and only send on one goroutine, would allows a read and writer goroutine, and this is without needing mutexes at all I think, by separating buffers I would guess.

So I’ll just have to stick with go for now.

1 Like