Trying to understand crystal concurrency

Thanks for the reply! I’m really enjoying crystal at the moment, uploading a crystal app to heroku was so easy I was able to get it up and running on the web already (click one of the “Game Room links”).

http://eatjason-2.gigalixirapp.com/projects/pages/webgames

I was able to make a mutex-less solution for this problem, I find mutexes clutter up code and create a strong coupling between objects and threads. I have some elixir experience, so I basically made 2 GenServers. One for the event handler, and one for the game state. The event handler has a single fiber to access its data, and all changes from outside come from a channel event. The game object is the only objec that accesses its own data, with the exception of some events that need to modify from the event handler. In those cases the game fiber just waits to recieve the all clear message on its channel after the event handler finishes modify the game data (only needs to do it twice a frame)

One of the main things I do not understand about this code is my use of playerId in the socket callbacks. I wasn’t sure how else to have those call backs access which slot id they were associated with. It appears to capture the value it was when the callback is defined for the websocket, and stays there even after the loop is done.

# # TODO: Write documentation for `Gameserver`
# module Gameserver
#   VERSION = "0.1.0"

#   # TODO: Put your code here
# end

# 2d vector class
struct Vector
  property x : Float32
  property y : Float32

  def +(other)
    Vector.new(x + other.x, y + other.y)
  end

  def initialize(@x : Float32, @y : Float32)
    
  end
end

# player commands (will include buttons and other actions later)
struct Command
  property type : UInt8 = 0     # this will be used to add and remove player units as players join and leave
  property pos : Vector = Vector.new(0,0)
end

# Events for event handler to process
struct Event

  enum Type
    None = 0
    Join
    Leave
    Update
    Swap
    Broadcast
  end

  property type : Type
  property socket : HTTP::WebSocket?
  property id : Int32
  property command : Command

  def initialize(@type : Type, @socket : HTTP::WebSocket?, @id : Int32, @command : Command)
  
  end

  def self.join(socket : HTTP::WebSocket?)
    Event.new Type::Join, socket, -1, Command.new
  end

  def self.leave(id : Int32)
    Event.new Type::Leave, nil, id, Command.new
  end

  def self.update(id : Int32, command : Command)
    Event.new Type::Update, nil, id, command
  end

  def self.swap
    Event.new Type::Swap, nil, -1, Command.new
  end

  def self.broadcast
    Event.new Type::Broadcast, nil, -1, Command.new
  end

end

# main event handler, this is the only class that accesses its data
class EventHandler
  # this is a direct reference to the game state, only used when game signals
  # event handler its ok to modify its state
  property game : Game?

  property channel : Channel(Event)           # command channel
  property sockets : Array(HTTP::WebSocket?)  # player sockets
  property commands : Array(Command)          # accumulated commands for frame
  
  # dispatch an event to the event handler
  def dispatch(event : Event)
    @channel.send(event)
  end

  def initialize(players : Int32)

    # set up inital data for handler
    @game = nil
    @channel = Channel(Event).new

    @sockets = Array(HTTP::WebSocket?).new players
    @commands = Array(Command).new players
    (1..players).each { 
      @sockets << nil
      @commands << Command.new
    }

    # this is the main event loop
    spawn do
      loop do
        event = @channel.receive()
        
        case event.type
        # dispatched when a new player joins a session
        when Event::Type::Join
          
          # finds an empty slot for player
          playerId = -1
          @sockets.each_with_index { |socket, i| playerId = i if !socket }
          
          # if a player slot is open
          if playerId != -1

            # socket should never be nil here
            if event.socket
              # Broadcast each message to all clients
              # According to the below link, I would think I wouldn't need the .as(HTTP::WebSocket) here...
              # https://crystal-lang.org/reference/syntax_and_semantics/if_var.html
              event.socket.as(HTTP::WebSocket).on_message do |message|
                # converts message to command struct
                list = message.split(' ');
    
                cmd = Command.new
                cmd.pos = Vector.new(list[0].to_f32, list[1].to_f32)
            
                # *NO IDEA HOW THIS WORKS...
                # somehow the local value of playerId is captured
                # by the fiber that uses this callback?
                @channel.send( Event.update playerId, cmd )

              end  

              # Remove clients from the list when it’s closed
              event.socket.as(HTTP::WebSocket).on_close do
                # Same playerId used in this callback
                @channel.send( Event.leave playerId )
              end

              # playerId index set to event.socket
              sockets[playerId] = event.socket
            end
          
          else
            # we close the connection here because there were no slots left
            event.socket.as(HTTP::WebSocket).close
          
          end

        # dispatched when a player leaves a session
        when Event::Type::Leave
          # close the socket and make it nil so new players can join  
          if sockets[event.id]
            sockets[event.id].as(HTTP::WebSocket).close
            sockets[event.id] = nil
          end

        # dispatched every time a player sends a message
        when Event::Type::Update
          # simply set the command buffer with event commnad
          commands[event.id] = event.command

        # dispatched when game loop is ready to run a new frame
        when Event::Type::Swap
          # all this does is swap the handler commands and game commands
          # so that the game can use what has been building up 
          # in the handler for the next frame
          game = @game.as(Game)
          temp = @commands
          @commands = game.commands
          game.commands = temp
          # this signals the game fiber that we are done messing with the game state
          # and allow the game to continue its loop
          game.channel.send(true)

        when Event::Type::Broadcast
          # broadcast game frame data to all active sockets
          game = @game.as(Game)
          @sockets.each { |socket| 
            if socket 
              socket.as(HTTP::WebSocket).send(game.outBuffer) 
            end
          }
          # this signals the game fiber that we are done sending off the new frame data to clients
          # and ready to contiue its loop
          game.channel.send(true)
        
        else
          # unknown command

        end
    
        
      end  
    end
  
  end


end


# this is the game state class, only the main game loop can access its
# data except for when it signals the handler that it will stop
# touching its memory for the handler to do things with it
# this only happens and the begining of the loop for input,
# and the end of the loop for output
class Game
  # game event handler, only safe function to call here is 'dispatch'
  property handler : EventHandler

  # these are the only two pieces of data that the handler ever touches
  property commands : Array(Command)  # command array for game to access and update the frame
  property outBuffer : String         # buffer for data to be send to all sockets

  property channel : Channel(Bool)    # channel for recieving complete signal from handler

  # these will be a series of arrays that will be entity components
  property playerPos : Array(Vector)  # current player positions

  def initialize(players : Int32)
    # initalize game data
    @handler = EventHandler.new players

    @channel = Channel(Bool).new

    rnd = Random.new

    @playerPos = Array(Vector).new players
    @commands = Array(Command).new players
    (1..players).each {
      @playerPos << Vector.new((rnd.rand(600) + 100).to_f32, (rnd.rand(400) + 100).to_f32)
      @commands << Command.new
    }

    @outBuffer = ""

    # this is the main game loop
    spawn do
      loop do
        # measure the frame execution so lock framerate at 30 fps
        elapsed_time = Time.measure do
          # first thing we do is dispatch a swap event to swap command buffers
          # so we can freely work with commands in game loop, and 
          # simultaneously allow the handler to assemble commands for next frame
          @handler.dispatch( Event.swap )
          @channel.receive()

          # update all player positions, and constrain motion to play field
          @commands.each_with_index { |command, i| 
            v = @playerPos[i] + command.pos
            v.x = 0   if v.x < 0
            v.x = 800 if v.x > 800
            v.y = 0   if v.y < 0
            v.y = 600 if v.y > 600
            @playerPos[i] = v
          }
  
          # assemble frame
          @outBuffer = String.build do |str|
            @playerPos.each { |p| str << "#{p.x} #{p.y} " }  
          end

          # send frame to all sockets
          @handler.dispatch( Event.broadcast )
          @channel.receive()

        end
        delay_time = 33.millisecond - elapsed_time # sleep for the remaining milliseconds to meet 30 fps
        sleep delay_time
      end
    end # this is the end for the spawning of the game loop

    # have to set handler game down here now that self is ready with all instance values set
    handler.game = self

  end

end

# now that my data is organized, I can spawn as many game
# instances as I want without worrying about race conditions
MAX_PLAYERS = 10
GAMES = [
  Game.new(MAX_PLAYERS),
  Game.new(MAX_PLAYERS),
  Game.new(MAX_PLAYERS)
]


require "kemal"

ws "/game/:id" do |socket, context|
  id = context.ws_route_lookup.params["id"].to_i32
  if id >= 0 && id < GAMES.size
    # send join message to event handler of game that matches game room id
    GAMES[id].handler.dispatch( Event.join socket )
  end
end



Kemal.run