Skip to content

Game Coordinator

Coordinator is the centerpiece of the game orchestration. It provides an interface between the agents and the worlds.

In detail it handles:

  1. World initialization
  2. Registration of new agents in the game
  3. Agent-World communication (message verification and forwarding)
  4. Recording (and storing) trajectories of agents (optional)
  5. Detection of episode ends (either by reaching timeout or agents reaching their respective goals)
  6. Assigning rewards for each action and at the end of each episode
  7. Removing agents from the game
  8. Registering the GameReset requests and handling the game resets.

To facilitate the communication the coordinator uses a TCP server to which agents connect. The communication is asynchronous and depends of the

Connection to other game components

Coordinator, having the role of the middle man in all communication between the agent and the world uses several queues for message passing and handling.

  1. Action queue is a queue in which the agents submit their actions. It provides N:1 communication channel in which the coordinator receives the inputs.
  2. Answer queues is a separate queue per agent in which the results of the actions are send to the agent.

Episode

The episode starts with sufficient amount of agents registering in the game. Each agent role has a maximum allowed number of steps defined in the task configuration. An episode ends if all agents reach the goal

netsecgame.game.coordinator.GameCoordinator(game_host, game_port, service_host, service_port, task_config_file)

Class for creation, and management of agent interactions in AI Dojo.

Attributes:

Name Type Description
host str

Host address for the game server.

port int

Port number for the game server.

logger Logger

Logger for the GameCoordinator.

config_manager ConfigurationManager

Manager for game configuration.

_tasks set

Set of active asyncio tasks.

shutdown_flag Event

Event to signal shutdown.

_reset_event Event

Event to signal game reset.

_episode_end_event Event

Event to signal episode end.

_episode_start_event Event

Event to signal episode start.

_episode_rewards_condition Condition

Condition for episode rewards assignment.

_reset_done_condition Condition

Condition for reset completion.

_reset_lock Lock

Lock for reset operations.

_agents_lock Lock

Lock for agent operations.

_cyst_objects Lock

CYST simulator initialization objects.

_cyst_object_string Lock

String representation of CYST objects.

_agent_action_queue Queue

Queue for agent actions.

_agent_response_queues dict

Mapping of agent addresses to their response queues.

agents dict

Mapping of agent addresses to their information.

_agent_steps dict

Step counters per agent address.

_reset_requests dict

Reset requests per agent address.

_randomize_topology_requests dict

Topology randomization requests per agent address.

_agent_status dict

Status of each agent.

_episode_ends dict

Episode end flags per agent address.

_agent_observations dict

Observations per agent address.

_agent_starting_position dict

Starting positions per agent address.

_agent_states dict

Current states per agent address.

_agent_goal_states dict

Goal states per agent address.

_agent_last_action dict

Last actions played by agents.

_agent_false_positives dict

False positives per agent.

_agent_rewards dict

Rewards per agent address.

_agent_trajectories dict

Trajectories per agent address.

Source code in netsecgame/game/coordinator.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def __init__(self, game_host: str, game_port: int, service_host:str, service_port:int, task_config_file:str) -> None:
    self.host = game_host
    self.port = game_port
    self.logger = logging.getLogger("GameCoordinator")

    self._tasks = set()
    self.shutdown_flag = asyncio.Event()
    self._reset_event = asyncio.Event()
    self._episode_end_event = asyncio.Event()
    self._episode_start_event = asyncio.Event()
    self._episode_rewards_condition = asyncio.Condition()
    self._reset_done_condition = asyncio.Condition()
    self._reset_lock = asyncio.Lock()
    self._agents_lock = asyncio.Lock()

    # Configuration Manager
    self.config_manager = ConfigurationManager(task_config_file, service_host, service_port)

    # prepare agent communication
    self._agent_action_queue = asyncio.Queue()
    self._agent_response_queues = {}

    # agent information
    self.agents = {}
    # step counter per agent_addr (int)
    self._agent_steps = {}
    # reset request per agent_addr (bool)
    self._reset_requests = {}
    self._randomize_topology_requests = {}
    self._agent_status = {}
    self._episode_ends = {}
    self._agent_observations = {}
    # starting per agent_addr (dict)
    self._agent_starting_position = {}
    # current state per agent_addr (GameState)
    self._agent_states = {}
    # goal state per agent_addr (GameState)
    self._agent_goal_states = {}
    # last action played by agent (Action)
    self._agent_last_action = {}
    # False positives per agent (due to added blocks)
    self._agent_false_positives = {}
    # agent status dict {agent_addr: int}
    self._agent_rewards = {}
    # trajectories per agent_addr
    self._agent_trajectories = {}

add_false_positive(agent)

Method for adding false positive to the agent. Args: agent (tuple): The agent to add false positive to.

Source code in netsecgame/game/coordinator.py
764
765
766
767
768
769
770
771
772
773
774
775
def add_false_positive(self, agent:tuple)->None:
    """
    Method for adding false positive to the agent.
    Args:
        agent (tuple): The agent to add false positive to.
    """
    self.logger.debug(f"Adding false positive to {agent}")
    if agent in self._agent_false_positives:
        self._agent_false_positives[agent] += 1
    else:
        self._agent_false_positives[agent] = 1
    self.logger.debug(f"False positives for {agent}: {self._agent_false_positives[agent]}")

create_agent_queue(agent_addr) async

Creates a queue for the given agent address if it doesn't already exist.

Parameters:

Name Type Description Default
agent_addr tuple

The agent address to create a queue for.

required
Source code in netsecgame/game/coordinator.py
139
140
141
142
143
144
145
146
147
148
async def create_agent_queue(self, agent_addr:tuple)->None:
    """
    Creates a queue for the given agent address if it doesn't already exist.

    Args:
        agent_addr (tuple): The agent address to create a queue for.
    """
    if agent_addr not in self._agent_response_queues:
        self._agent_response_queues[agent_addr] = asyncio.Queue()
        self.logger.info(f"Created queue for agent {agent_addr}. {len(self._agent_response_queues)} queues in total.")

goal_check(agent_addr)

Check if the goal conditons were satisfied in a given game state

Source code in netsecgame/game/coordinator.py
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
def goal_check(self, agent_addr:tuple)->bool:
    """
    Check if the goal conditons were satisfied in a given game state
    """
    def goal_dict_satistfied(goal_dict:dict, known_dict: dict)-> bool:
        """
        Helper function for checking if a goal dictionary condition is satisfied
        """
        # check if we have all IPs that should have some values (are keys in goal_dict)
        if goal_dict.keys() <= known_dict.keys():
            try:
                # Check if values (sets) for EACH key (host) in goal_dict are subsets of known_dict, keep matching_keys
                matching_keys = [host for host in goal_dict.keys() if goal_dict[host]<= known_dict[host]]
                # Check we have the amount of mathing keys as in the goal_dict
                if len(matching_keys) == len(goal_dict.keys()):
                    return True
            except KeyError:
                # some keys are missing in the known_dict
                return False
        return False
    self.logger.debug(f"Checking goal for agent {agent_addr}.")
    state = self._agent_states[agent_addr]
    # For each part of the state of the game, check if the conditions are met
    target_goal_state = self._agent_goal_states[agent_addr]
    self.logger.debug(f"\tGoal conditions: {target_goal_state}.")
    goal_reached = {}    
    goal_reached["networks"] = target_goal_state.known_networks <= state.known_networks
    goal_reached["known_hosts"] = target_goal_state.known_hosts <= state.known_hosts
    goal_reached["controlled_hosts"] = target_goal_state.controlled_hosts <= state.controlled_hosts
    goal_reached["services"] = goal_dict_satistfied(target_goal_state.known_services, state.known_services)
    goal_reached["data"] = goal_dict_satistfied(target_goal_state.known_data, state.known_data)
    goal_reached["known_blocks"] = goal_dict_satistfied(target_goal_state.known_blocks, state.known_blocks)
    self.logger.debug(f"\t{goal_reached}")
    return all(goal_reached.values())

is_agent_benign(agent_addr)

Check if the agent is benign (defender, normal)

Source code in netsecgame/game/coordinator.py
862
863
864
865
866
867
868
869
def is_agent_benign(self, agent_addr:tuple)->bool:
    """
    Check if the agent is benign (defender, normal)
    """
    if agent_addr not in self.agents:
        return False
    #TODO: change to use AgentRole
    return self.agents[agent_addr][1].lower() in ["defender", "benign"]

register_agent(agent_id, agent_role, agent_initial_view, agent_win_condition_view) async

Domain specific method of the environment. Creates the initial state of the agent.

Source code in netsecgame/game/coordinator.py
642
643
644
645
646
async def register_agent(self, agent_id:tuple, agent_role:AgentRole, agent_initial_view:dict, agent_win_condition_view:dict)->tuple[GameState, GameState]:
    """
    Domain specific method of the environment. Creates the initial state of the agent.
    """
    raise NotImplementedError

remove_agent(agent_id, agent_state) async

Domain specific method of the environment. Creates the initial state of the agent.

Source code in netsecgame/game/coordinator.py
648
649
650
651
652
async def remove_agent(self, agent_id:tuple, agent_state:GameState)->bool:
    """
    Domain specific method of the environment. Creates the initial state of the agent.
    """
    raise NotImplementedError

reset() async

Domain specific method of the environment. Creates the initial state of the agent. Must be implemented by the domain specific environment.

Source code in netsecgame/game/coordinator.py
699
700
701
702
703
704
async def reset(self)->bool:
    """
    Domain specific method of the environment. Creates the initial state of the agent.
    Must be implemented by the domain specific environment.
    """
    raise NotImplementedError

run()

Wrapper for ayncio run function. Starts all tasks in AIDojo

Source code in netsecgame/game/coordinator.py
150
151
152
153
154
155
156
157
158
159
def run(self)->None:
    """
    Wrapper for ayncio run function. Starts all tasks in AIDojo
    """
    try:
        asyncio.run(self.start_tasks())
    except Exception as e:
        self.logger.error(f"Unexpected error: {e}")
    finally:
        self.logger.info(f"{__class__.__name__} has exited.")

run_game() async

Main game loop task.

Responsible for reading messages from the agent queue, parsing them using _parse_action_message, and dispatching them to the appropriate handler using _dispatch_action.

Source code in netsecgame/game/coordinator.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
async def run_game(self):
    """
    Main game loop task. 

    Responsible for reading messages from the agent queue, parsing them using `_parse_action_message`, 
    and dispatching them to the appropriate handler using `_dispatch_action`.
    """
    while not self.shutdown_flag.is_set():
        # Read message from the queue
        agent_addr, message = await self._agent_action_queue.get()
        if message is not None:
            self.logger.info(f"Coordinator received from agent {agent_addr}: {message}.")

            action = self._parse_action_message(agent_addr, message)
            if action:
                self._dispatch_action(agent_addr, action)
    self.logger.info("\tAction processing task stopped.")

shutdown_signal_handler() async

Logs the signal reception and sets the shutdown flag to initiate graceful termination.

Source code in netsecgame/game/coordinator.py
132
133
134
135
136
137
async def shutdown_signal_handler(self)->None:
    """
    Logs the signal reception and sets the shutdown flag to initiate graceful termination.
    """
    self.logger.info("Shutdown signal received. Setting shutdown flag.")
    self.shutdown_flag.set()

start_tasks() async

High level funciton to start all the other asynchronous tasks. - Reads the conf of the coordinator - Creates queues - Start the main part of the coordinator - Start a server that listens for agents

Source code in netsecgame/game/coordinator.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
async def start_tasks(self):
    """
    High level funciton to start all the other asynchronous tasks.
    - Reads the conf of the coordinator
    - Creates queues
    - Start the main part of the coordinator
    - Start a server that listens for agents
    """
    loop = asyncio.get_running_loop()

    # Set up signal handlers for graceful shutdown
    loop.add_signal_handler(
        signal.SIGINT, lambda: asyncio.create_task(self.shutdown_signal_handler())
    )
    loop.add_signal_handler(
        signal.SIGTERM, lambda: asyncio.create_task(self.shutdown_signal_handler())
    )


    # Initialize configuration manager and load the configuration
    await self.config_manager.load()
    self._cyst_objects = self.config_manager.get_cyst_objects()

    if self.config_manager.get_config_hash():
         self._CONFIG_FILE_HASH = self.config_manager.get_config_hash()

    # Read configuration
    self._starting_positions_per_role = self.config_manager.get_all_starting_positions()
    self._win_conditions_per_role = self.config_manager.get_all_win_conditions()
    self._goal_description_per_role = self.config_manager.get_all_goal_descriptions()
    self._steps_limit_per_role = self.config_manager.get_all_max_steps()

    self.logger.debug(f"Timeouts set to:{self._steps_limit_per_role}")
    if self.config_manager.get_use_global_defender():
        self._global_defender = GlobalDefender()
    else:
        self._global_defender = None
    self._use_dynamic_ips = self.config_manager.get_use_dynamic_ips()
    self.logger.info(f"Change IP every episode set to: {self._use_dynamic_ips}")
    self._rewards = self.config_manager.get_rewards(["step", "success", "fail", "false_positive"])
    self.logger.info(f"Rewards set to:{self._rewards}")
    self._min_required_players = self.config_manager.get_required_num_players()
    self.logger.info(f"Min player requirement set to:{self._min_required_players}")
    # run self initialization
    self._initialize()

    # start server for agent communication
    self._spawn_task(self.start_tcp_server)

    # start episode rewards task
    self._spawn_task(self._assign_rewards_episode_end)

    # start episode rewards task
    self._spawn_task(self._reset_game)

    # start action processing task
    self._spawn_task(self.run_game)

    while not self.shutdown_flag.is_set():
        # just wait until user terminates
        await asyncio.sleep(1)
    self.logger.debug("Final cleanup started")
    # make sure there are no running tasks left
    for task in self._tasks:
        task.cancel()  # Cancel each active task
    await asyncio.gather(*self._tasks, return_exceptions=True)  # Wait for all tasks to finish
    self.logger.info("All tasks shut down.")

start_tcp_server() async

Starts TPC sever for the agent communication.

Source code in netsecgame/game/coordinator.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
async def start_tcp_server(self):
    """
    Starts TPC sever for the agent communication.
    """
    server = None
    try:
        self.logger.info("Starting the server listening for agents")
        server = await asyncio.start_server(
            AgentServer(
                self._agent_action_queue,
                self._agent_response_queues,
                max_connections=self._min_required_players
            ),
            self.host,
            self.port
        )
        addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)
        self.logger.info(f"\tServing on {addrs}")
        while not self.shutdown_flag.is_set():
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        self.logger.debug("\tStopping TCP server task.")
    except Exception as e:
        self.logger.error(f"TCP server failed: {e}")
    finally:
        if server:
            server.close()
            await server.wait_closed()
        self.logger.info("\tTCP server task stopped")

step(agent_id, agent_state, action) async

Domain specific method of the environment. Creates the initial state of the agent. Must be implemented by the domain specific environment.

Source code in netsecgame/game/coordinator.py
692
693
694
695
696
697
async def step(self, agent_id:tuple, agent_state:GameState, action:Action):
    """
    Domain specific method of the environment. Creates the initial state of the agent.
    Must be implemented by the domain specific environment.
    """
    raise NotImplementedError

netsecgame.game.worlds.NetSecGame.NetSecGame(game_host, game_port, task_config, seed=None)

Bases: GameCoordinator

Source code in netsecgame/game/worlds/NetSecGame.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def __init__(self, game_host, game_port, task_config:str, seed=None):
    super().__init__(game_host, game_port, service_host=None, service_port=None, task_config_file=task_config)

    # Internal data structure of the NSG
    self._ip_to_hostname = {} # Mapping of `IP`:`host_name`(str) of all nodes in the environment
    self._networks = {} # A `dict` of the networks present in the environment. Keys: `Network` objects, values `set` of `IP` objects.
    self._services = {} # Dict of all services in the environment. Keys: hostname (`str`), values: `set` of `Service` objetcs.
    self._data = {} # Dict of all services in the environment. Keys: hostname (`str`), values `set` of `Service` objetcs.
    self._data_content = {} # ??? Not sure. Added by by sebas to fix error in reading config file
    self._firewall = {} # dict of all the allowed connections in the environment. Keys `IP` ,values: `set` of `IP` objects.
    self._fw_blocks = {}
    self._agent_fw_rules = {}
    # All exploits in the environment
    self._exploits = {}
    # A list of all the hosts where the attacker can start in a random start
    self.hosts_to_start = []
    self._network_mapping = {}
    self._ip_mapping = {}


    np.random.seed(seed)
    random.seed(seed)
    self._seed = seed
    self.logger.info(f'Setting env seed to {seed}')

reset() async

Function to reset the state of the game and prepare for a new episode

Source code in netsecgame/game/worlds/NetSecGame.py
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
async def reset(self)->bool:
    """
    Function to reset the state of the game
    and prepare for a new episode
    """
    # write all steps in the episode replay buffer in the file
    self.logger.info('--- Reseting NSG Environment to its initial state ---')
    # change IPs if needed
    # This is done ONLY if it is (i) enabled in the task config and (ii) all agents requested it
    if self.config_manager.get_use_dynamic_ips():
        if all(self._randomize_topology_requests.values()):
            self.logger.info("All agents requested reset with randomized topology.")
            self._dynamic_ip_change()
        else:
            self.logger.info("Not all agents requested a topology randomization. Keeping the current one.")
    # reset self._data to orignal state
    self._data = copy.deepcopy(self._data_original)
    # reset self._data_content to orignal state
    self._data_content = copy.deepcopy(self._data_content_original)
    # reset all firewall related data structure
    self._firewall = copy.deepcopy(self._firewall_original)
    self._fw_blocks = {}
    self._agent_fw_rules = {}
    return True