Skip to content

Game Coordinator

Coordinator is the centerpiece of the game orchestration. It provides an interface between the agents and the worlds.

In detail it handles:

  1. World initialization
  2. Registration of new agents in the game
  3. Agent-World communication (message verification and forwarding)
  4. Recording (and storing) trajectories of agents (optional)
  5. Detection of episode ends (either by reaching timeout or agents reaching their respective goals)
  6. Assigning rewards for each action and at the end of each episode
  7. Removing agents from the game
  8. Registering the GameReset requests and handling the game resets.

To facilitate the communication the coordinator uses a TCP server to which agents connect. The communication is asynchronous and depends on the world's implementation.

Connection to other game components

Coordinator, having the role of the middle man in all communication between the agent and the world uses several queues for message passing and handling.

  1. Action queue is a queue in which the agents submit their actions. It provides N:1 communication channel in which the coordinator receives the inputs.
  2. Answer queues is a separate queue per agent in which the results of the actions are sent to the agent.

Episode

The episode starts with sufficient amount of agents registering in the game. Each agent role has a maximum allowed number of steps defined in the task configuration. An episode ends if all agents reach the goal or if the maximum number of steps is reached.

netsecgame.game.coordinator.GameCoordinator

Class for creation, and management of agent interactions in AI Dojo.

Attributes:

Name Type Description
host str

Host address for the game server.

port int

Port number for the game server.

logger Logger

Logger for the GameCoordinator.

config_manager ConfigurationManager

Manager for game configuration.

_tasks set

Set of active asyncio tasks.

shutdown_flag Event

Event to signal shutdown.

_reset_event Event

Event to signal game reset.

_episode_end_event Event

Event to signal episode end.

_episode_start_event Event

Event to signal episode start.

_episode_rewards_condition Condition

Condition for episode rewards assignment.

_reset_done_condition Condition

Condition for reset completion.

_reset_lock Lock

Lock for reset operations.

_agents_lock Lock

Lock for agent operations.

_cyst_objects Lock

CYST simulator initialization objects.

_cyst_object_string Lock

String representation of CYST objects.

_agent_action_queue Queue

Queue for agent actions.

_agent_response_queues dict

Mapping of agent addresses to their response queues.

agents dict

Mapping of agent addresses to their information.

_agent_steps dict

Step counters per agent address.

_reset_requests dict

Reset requests per agent address.

_randomize_topology_requests dict

Topology randomization requests per agent address.

_agent_status dict

Status of each agent.

_episode_ends dict

Episode end flags per agent address.

_agent_observations dict

Observations per agent address.

_agent_starting_position dict

Starting positions per agent address.

_agent_states dict

Current states per agent address.

_agent_goal_states dict

Goal states per agent address.

_agent_last_action dict

Last actions played by agents.

_agent_false_positives dict

False positives per agent.

_agent_rewards dict

Rewards per agent address.

_agent_trajectories dict

Trajectories per agent address.

Source code in netsecgame/game/coordinator.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(self, game_host: str, game_port: int, service_host:str, service_port:int, task_config_file:str) -> None:
    self.host = game_host
    self.port = game_port
    self.logger = logging.getLogger("GameCoordinator")

    self._tasks = set()
    self.shutdown_flag = asyncio.Event()
    self._reset_event = asyncio.Event()
    self._episode_end_event = asyncio.Event()
    self._episode_start_event = asyncio.Event()
    self._episode_rewards_condition = asyncio.Condition()
    self._reset_done_condition = asyncio.Condition()
    self._reset_lock = asyncio.Lock()
    self._agents_lock = asyncio.Lock()

    # Configuration Manager
    self.config_manager = ConfigurationManager(task_config_file, service_host, service_port)

    # prepare agent communication
    self._agent_action_queue = asyncio.Queue()
    self._agent_response_queues = {}

    # agent information
    self.agents = {}
    # step counter per agent_addr (int)
    self._agent_steps = {}
    # reset request per agent_addr (bool)
    self._reset_requests = {}
    self._randomize_topology_requests = {}
    self._reset_seed_requests = {}
    self._agent_status = {}
    self._episode_ends = {}
    self._agent_observations = {}
    # starting per agent_addr (dict)
    self._agent_starting_position = {}
    # current state per agent_addr (GameState)
    self._agent_states = {}
    # goal state per agent_addr (GameState)
    self._agent_goal_states = {}
    # last action played by agent (Action)
    self._agent_last_action = {}
    # False positives per agent (due to added blocks)
    self._agent_false_positives = {}
    # agent status dict {agent_addr: int}
    self._agent_rewards = {}
    # trajectories per agent_addr
    self._agent_trajectories = {}

_add_step_to_trajectory

Adds a single step (state, action, reward) to the agent's trajectory.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required
action Action

The action performed.

required
reward float

The reward received.

required
next_state GameState

The resulting state.

required
end_reason Optional[str]

An optional reason if the episode ended.

None

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
def _add_step_to_trajectory(self, agent_addr: tuple, action: Action, reward: float, next_state: GameState, end_reason: Optional[str] = None) -> None:
    """
    Adds a single step (state, action, reward) to the agent's trajectory.

    Args:
        agent_addr (tuple): The address of the agent.
        action (Action): The action performed.
        reward (float): The reward received.
        next_state (GameState): The resulting state.
        end_reason (Optional[str]): An optional reason if the episode ended.

    Returns:
        None
    """
    if agent_addr in self._agent_trajectories:
        self.logger.debug(f"Adding step to trajectory of {agent_addr}")
        self._agent_trajectories[agent_addr]["trajectory"]["actions"].append(action.as_dict)
        self._agent_trajectories[agent_addr]["trajectory"]["rewards"].append(reward)
        self._agent_trajectories[agent_addr]["trajectory"]["states"].append(next_state.as_dict)
        if end_reason:
            self._agent_trajectories[agent_addr]["end_reason"] = end_reason

_assign_rewards_episode_end async

Task that waits for all agents to finish and then assigns final rewards.

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
async def _assign_rewards_episode_end(self) -> None:
    """
    Task that waits for all agents to finish and then assigns final rewards.

    Returns:
        None
    """
    self.logger.debug("Starting task for episode end reward assigning.")
    while not self.shutdown_flag.is_set():
        # wait until episode is finished by all agents
        done, pending = await asyncio.wait(
           [asyncio.create_task(self._episode_end_event.wait()), 
            asyncio.create_task(self.shutdown_flag.wait())],
            return_when=asyncio.FIRST_COMPLETED,
        )
        # Check if shutdown_flag was set
        if self.shutdown_flag.is_set():
            self.logger.debug("\tExiting reward assignment task.")
            break
        if len(self.agents) > 0:        
            self.logger.info("Episode finished. Assigning final rewards to agents.")
            async with self._agents_lock:
                attackers = [a for a,(_, a_role) in self.agents.items() if a_role.lower() == "attacker"]
                defenders = [a for a,(_, a_role) in self.agents.items() if a_role.lower() == "defender"]
                successful_attack = False
                # award attackers
                for agent in attackers:
                    self.logger.debug(f"Processing reward for agent {agent}")
                    if self._agent_status[agent] is AgentStatus.Success:
                        self._agent_rewards[agent] += self._rewards["success"]
                        successful_attack = True
                    else:
                        self._agent_rewards[agent] += self._rewards["fail"]

                # award defenders
                for agent in defenders:
                    self.logger.debug(f"Processing reward for agent {agent}")
                    if not successful_attack:
                        self._agent_rewards[agent] += self._rewards["success"]
                        self._agent_status[agent] = AgentStatus.Success
                    else:
                        self._agent_rewards[agent] += self._rewards["fail"]
                        self._agent_status[agent] = AgentStatus.Fail
                    # dicrease the reward for false positives
                    self.logger.debug(f"Processing false positives for agent {agent}: {self._agent_false_positives[agent]}")
                    self._agent_rewards[agent] += self._agent_false_positives[agent] * self._rewards["false_positive"]
        # clear the episode end event
        self._episode_end_event.clear()
        # notify all waiting agents
        async with self._episode_rewards_condition:
            self._episode_rewards_condition.notify_all()
    self.logger.info("\tReward assignment task stopped.")

_dispatch_action

Dispatches an Action to the appropriate processing method based on its type.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent performing the action.

required
action Action

The Action object to be processed.

required
Source code in netsecgame/game/coordinator.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def _dispatch_action(self, agent_addr: tuple, action: Action) -> None:
    """
    Dispatches an Action to the appropriate processing method based on its type.

    Args:
        agent_addr (tuple): The address of the agent performing the action.
        action (Action): The Action object to be processed.
    """
    match action.type:
        case ActionType.JoinGame:
            self.logger.debug(f"[{agent_addr}] Start processing of ActionType.JoinGame")
            self._spawn_task(self._process_join_game_action, agent_addr, action)
        case ActionType.QuitGame:
            self.logger.debug(f"[{agent_addr}] Start processing of ActionType.QuitGame")
            self._spawn_task(self._process_quit_game_action, agent_addr)
        case ActionType.ResetGame:
            self.logger.debug(f"[{agent_addr}] Start processing of ActionType.ResetGame")
            self._spawn_task(self._process_reset_game_action, agent_addr, action)
        case ActionType.ExfiltrateData | ActionType.FindData | ActionType.ScanNetwork | ActionType.FindServices | ActionType.ExploitService | ActionType.BlockIP:
            self.logger.debug(f"[{agent_addr}] Start processing of {action.type}")
            self._spawn_task(self._process_game_action, agent_addr, action)
        case _:
            self.logger.warning(f"[{agent_addr}] Unsupported action type: {action}!")

_handle_invalid_reset async

Handles an invalid reset request by notifying agents and shutting down.

Parameters:

Name Type Description Default
error_msg str

The error message explaining why the reset is invalid.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
async def _handle_invalid_reset(self, error_msg: str) -> None:
    """
    Handles an invalid reset request by notifying agents and shutting down.

    Args:
        error_msg (str): The error message explaining why the reset is invalid.

    Returns:
        None
    """
    self.logger.error(error_msg)
    for agent in self.agents:
        async with self._agents_lock:
            output_message_dict = {
                "to_agent": agent,
                "status": str(GameStatus.BAD_REQUEST),
                "observation": None,
                "message": {"message": error_msg},
            }
            response_msg_json = convert_msg_dict_to_json(output_message_dict)
            await self._agent_response_queues[agent].put(response_msg_json)
    self.shutdown_flag.set()

_handle_valid_reset async

Handles a valid reset request by resetting the world and agents.

Parameters:

Name Type Description Default
seed Optional[int]

The random seed to use for the new episode.

required
topology_change Optional[bool]

Whether to randomize the topology.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
async def _handle_valid_reset(self, seed: Optional[int], topology_change: Optional[bool]) -> None:
    """
    Handles a valid reset request by resetting the world and agents.

    Args:
        seed (Optional[int]): The random seed to use for the new episode.
        topology_change (Optional[bool]): Whether to randomize the topology.

    Returns:
        None
    """
    self.logger.info(f"Resetting game to initial state with seed: {seed} and topology change: {topology_change}")
    # reset the game 
    await self.reset(seed=seed, topology_change=topology_change)
    for agent in self.agents:
        if self.config_manager.get_store_trajectories():
            async with self._agents_lock:
                self._store_trajectory_to_file(agent)
        self.logger.debug(f"Resetting agent {agent}")
        agent_role = self.agents[agent][1]
        # reset the agent in the world
        new_state, new_goal_state = await self.reset_agent(agent, agent_role, self._starting_positions_per_role[agent_role], self._win_conditions_per_role[agent_role])
        new_observation = Observation(new_state, 0, False, {})
        async with self._agents_lock:
            self._agent_states[agent] = new_state
            self._agent_goal_states[agent] = new_goal_state
            self._agent_observations[agent] = new_observation
            self._episode_ends[agent] = False
            self._reset_requests[agent] = False
            self._randomize_topology_requests.pop(agent, None)
            self._reset_seed_requests.pop(agent, None)
            self._agent_rewards[agent] = 0
            self._agent_steps[agent] = 0
            self._agent_false_positives[agent] = 0
            if self.agents[agent][1].lower() == "attacker":
                self._agent_status[agent] = AgentStatus.PlayingWithTimeout
            else:
                self._agent_status[agent] = AgentStatus.Playing

_initialize

Initializes the environment state and components.

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
913
914
915
916
917
918
919
920
def _initialize(self) -> None:
    """
    Initializes the environment state and components.

    Returns:
        None
    """
    raise NotImplementedError

_initialize_new_player

Initializes a new player's state and data upon joining the game.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required
agent_current_state GameState

The initial state assigned to the agent.

required
agent_current_goal_state GameState

The goal state assigned to the agent.

required

Returns:

Name Type Description
Observation Observation

The initial observation for the agent.

Source code in netsecgame/game/coordinator.py
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
def _initialize_new_player(self, agent_addr: tuple, agent_current_state: GameState, agent_current_goal_state: GameState) -> Observation:
    """
    Initializes a new player's state and data upon joining the game.

    Args:
        agent_addr (tuple): The address of the agent.
        agent_current_state (GameState): The initial state assigned to the agent.
        agent_current_goal_state (GameState): The goal state assigned to the agent.

    Returns:
        Observation: The initial observation for the agent.
    """
    self.logger.info(f"\tInitializing new player{agent_addr}")
    agent_name, agent_role = self.agents[agent_addr]
    self._agent_steps[agent_addr] = 0
    self._reset_requests[agent_addr] = False
    self._episode_ends[agent_addr] = False
    self._agent_starting_position[agent_addr] = self._starting_positions_per_role[agent_role]
    self._agent_states[agent_addr] = agent_current_state
    self._agent_goal_states[agent_addr] = agent_current_goal_state
    self._agent_last_action[agent_addr] = None
    self._agent_rewards[agent_addr] = 0
    self._agent_false_positives[agent_addr] = 0
    if agent_role in [AgentRole.Attacker]:
        self._agent_status[agent_addr] = AgentStatus.PlayingWithTimeout
    else:
        self._agent_status[agent_addr] = AgentStatus.Playing
    self._agent_trajectories[agent_addr] = self._reset_trajectory(agent_addr)
    self.logger.info(f"\tAgent {agent_name} ({agent_addr}), registred as {agent_role}")
    # create initial observation
    return Observation(self._agent_states[agent_addr], 0, False, {})

_parse_action_message

Parses a JSON message from an agent into an Action object.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent sending the message (used for logging context).

required
message str

The raw JSON string message received from the agent.

required

Returns:

Type Description
Optional[Action]

Optional[Action]: The parsed Action object if successful, None otherwise.

Source code in netsecgame/game/coordinator.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def _parse_action_message(self, agent_addr: tuple, message: str) -> Optional[Action]:
    """
    Parses a JSON message from an agent into an Action object.

    Args:
        agent_addr (tuple): The address of the agent sending the message (used for logging context).
        message (str): The raw JSON string message received from the agent.

    Returns:
        Optional[Action]: The parsed Action object if successful, None otherwise.
    """
    try:
        action = Action.from_json(message)
        return action
    except Exception as e:
        self.logger.error(f"Error when converting msg from {agent_addr} to Action using Action.from_json():{e}, {message}")
        return None

_process_game_action async

Processes a generic game action (Scan, Exploit, etc.).

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required
action Action

The Action object to process.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
async def _process_game_action(self, agent_addr: tuple, action: Action) -> None:
    """
    Processes a generic game action (Scan, Exploit, etc.).

    Args:
        agent_addr (tuple): The address of the agent.
        action (Action): The Action object to process.

    Returns:
        None
    """
    if self._episode_ends[agent_addr]:
        self.logger.warning(f"Agent {agent_addr}({self.agents[agent_addr]}) is attempting to play action {action} after the end of the episode!")
        # agent can't play any more actions in the game
        current_observation = self._agent_observations[agent_addr]
        reward = self._agent_rewards[agent_addr]
        end_reason = str(self._agent_status[agent_addr])
        new_observation = Observation(
            current_observation.state,
            reward=reward,
            end=True,
            info={'end_reason': end_reason, "info":"Episode ended. Request reset for starting new episode."})
        output_message_dict = {
            "to_agent": agent_addr,
            "observation": observation_as_dict(new_observation),
            "status": str(GameStatus.FORBIDDEN),
        }
    else:
        async with self._agents_lock:
            self._agent_last_action[agent_addr] = action
            self._agent_steps[agent_addr] += 1
        # wait for the new state from the world
        new_state = await self.step(agent_id=agent_addr, agent_state=self._agent_states[agent_addr], action=action)

        # update agent's values
        async with self._agents_lock:
            # store new state of the agent
            self._agent_states[agent_addr] = new_state

            # store new state of the agent using the new state
            self._agent_status[agent_addr] = self._update_agent_status(agent_addr)

            # add reward for step (other rewards are added at the end of the episode)
            self._agent_rewards[agent_addr] = self._rewards["step"]

            # check if the episode ends for this agent
            self._episode_ends[agent_addr] = self._update_agent_episode_end(agent_addr)

            # check if the episode ends
            if all(self._episode_ends.values()):
                self._episode_end_event.set()
        if self._episode_ends[agent_addr]:
            # episode ended for this agent - wait for the others to finish
            async with self._episode_rewards_condition:
                await self._episode_rewards_condition.wait()
        # append step to the trajectory if needed


        info = {}
        if self._agent_status[agent_addr] not in [AgentStatus.Playing, AgentStatus.PlayingWithTimeout]:
            info["end_reason"] = str(self._agent_status[agent_addr])
        async with self._agents_lock:
            self._add_step_to_trajectory(agent_addr, action, self._agent_rewards[agent_addr], new_state,end_reason=info.get("end_reason", ""))
        # add information to 'info' field if needed
        new_observation = Observation(self._agent_states[agent_addr], self._agent_rewards[agent_addr], self._episode_ends[agent_addr], info=info)
        self._agent_observations[agent_addr] = new_observation
        output_message_dict = {
            "to_agent": agent_addr,
            "observation": observation_as_dict(new_observation),
            "status": str(GameStatus.OK),
        }
    response_msg_json = convert_msg_dict_to_json(output_message_dict)
    await self._agent_response_queues[agent_addr].put(response_msg_json)

_process_join_game_action async

Processes an Action of type ActionType.JoinGame.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required
action Action

The JoinGame Action object.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
async def _process_join_game_action(self, agent_addr: tuple, action: Action) -> None:
    """
    Processes an Action of type ActionType.JoinGame.

    Args:
        agent_addr (tuple): The address of the agent.
        action (Action): The JoinGame Action object.

    Returns:
        None
    """
    try:
        self.logger.info(f"New Join request by  {agent_addr}.")
        if agent_addr not in self.agents:
            agent_name = sanitize_agent_name(str(action.parameters["agent_info"].name))
            agent_role = action.parameters["agent_info"].role
            if agent_role in AgentRole:
                # add agent to the world
                new_agent_game_state, new_agent_goal_state = await self.register_agent(agent_addr, agent_role, self._starting_positions_per_role[agent_role], self._win_conditions_per_role[agent_role])
                if new_agent_game_state: # successful registration
                    async with self._agents_lock:
                        self.agents[agent_addr] = (agent_name, agent_role)
                        observation = self._initialize_new_player(agent_addr, new_agent_game_state, new_agent_goal_state)
                        self._agent_observations[agent_addr] = observation
                        #if len(self.agents) == self._min_required_players:
                        if sum(1 for v in self._agent_status.values() if v == AgentStatus.PlayingWithTimeout) >= self._min_required_players:
                            # set the event so the episde can start
                            self._episode_start_event.set()
                            self.logger.info("Enough players joined. Starting the episode.")
                        else:
                            self.logger.debug("Waiting for other players to join.")
                    # wait for required number of players
                    await self._episode_start_event.wait()
                    output_message_dict = {
                        "to_agent": agent_addr,
                        "status": str(GameStatus.CREATED),
                        "observation": observation_as_dict(observation),
                        "message": {
                            "message": f"Welcome {agent_name}, registred as {agent_role}",
                            "max_steps": self._steps_limit_per_role[agent_role],
                            "goal_description": self._goal_description_per_role[agent_role],
                            "actions": [str(a) for a in ActionType],
                            "configuration_hash": self._CONFIG_FILE_HASH
                            },
                    }
                    if hasattr(self, "_registration_info"):
                        for key, value in self._registration_info.items():
                            output_message_dict["message"][key] = value
                    await self._agent_response_queues[agent_addr].put(convert_msg_dict_to_json(output_message_dict))
            else:
                self.logger.info(
                    f"\tError in registration, unknown agent role: {agent_role}!"
                )
                output_message_dict = {
                    "to_agent": agent_addr,
                    "status": str(GameStatus.BAD_REQUEST),
                    "message": f"Incorrect agent_role {agent_role}",
                }
                response_msg_json = convert_msg_dict_to_json(output_message_dict)
                await self._agent_response_queues[agent_addr].put(response_msg_json)
        else:
            self.logger.info("\tError in registration, agent already exists!")
            output_message_dict = {
                    "to_agent": agent_addr,
                    "status": str(GameStatus.BAD_REQUEST),
                    "message": "Agent already exists.",
                }
            response_msg_json = convert_msg_dict_to_json(output_message_dict)
            await self._agent_response_queues[agent_addr].put(response_msg_json)
    except asyncio.CancelledError:
        self.logger.debug(f"Proccessing JoinAction of agent {agent_addr} interrupted")
        raise  # Ensure the exception propagates
    finally:
        self.logger.debug(f"Cleaning up after JoinGame for {agent_addr}.")

_process_quit_game_action async

Processes an Action of type ActionType.QuitGame.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
async def _process_quit_game_action(self, agent_addr: tuple) -> None:
    """
    Processes an Action of type ActionType.QuitGame.

    Args:
        agent_addr (tuple): The address of the agent.

    Returns:
        None
    """
    try:
        if agent_addr in self._agent_states:
            await self.remove_agent(agent_addr, self._agent_states[agent_addr])
        else:
            self.logger.warning(f"Agent address {agent_addr} not found in _agent_states. Skipping removal.")
        agent_info = await self._remove_agent_from_game(agent_addr)
        self.logger.info(f"Agent {agent_addr} removed from the game. {agent_info}")
    except asyncio.CancelledError:
        self.logger.debug(f"Proccessing QuitAction of agent {agent_addr} interrupted")
        raise  # Ensure the exception propagates
    finally:
        self.logger.debug(f"Cleaning up after QuitGame for {agent_addr}.")

_process_reset_game_action async

Processes an Action of type ActionType.ResetGame.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required
reset_action Action

The ResetGame Action object.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
async def _process_reset_game_action(self, agent_addr: tuple, reset_action: Action) -> None:
    """
    Processes an Action of type ActionType.ResetGame.

    Args:
        agent_addr (tuple): The address of the agent.
        reset_action (Action): The ResetGame Action object.

    Returns:
        None
    """
    self.logger.debug("Beginning the _process_reset_game_action.")
    async with self._reset_lock:
        # add reset request for this agent
        self._reset_requests[agent_addr] = True
        # get the seed for the reset (default None - no change to the rng)
        self._reset_seed_requests[agent_addr] = reset_action.parameters.get("seed", None)
        self.logger.debug(f"Agent {agent_addr} requested reset with seed {self._reset_seed_requests[agent_addr]}")
        # record topology randomization request
        #  - ONLY consider agents that submitted seed
        # register if the agent wants to randomize the topology
        if self._reset_seed_requests[agent_addr] is not None:
            self._randomize_topology_requests[agent_addr] = reset_action.parameters.get("randomize_topology", True)
        if all(self._reset_requests.values()):
            # all agents want reset - reset the world
            self.logger.debug(f"All agents requested reset, setting the event")
            self._reset_event.set()

    # wait until reset is done
    async with self._reset_done_condition:
        await self._reset_done_condition.wait()
    # # make sure there is still enough players to play.
    await self._episode_start_event.wait()
    async with self._agents_lock:
        output_message_dict = {
            "to_agent": agent_addr,
            "status": str(GameStatus.RESET_DONE),
            "observation": observation_as_dict(self._agent_observations[agent_addr]),
            "message": {
                        "message": "Resetting Game and starting again.",
                        "max_steps": self._steps_limit_per_role[self.agents[agent_addr][1]],
                        "goal_description": self._goal_description_per_role[self.agents[agent_addr][1]],
                        "configuration_hash": self._CONFIG_FILE_HASH
                        },
        }
        # extend the message with last trajectory
        if "request_trajectory" in reset_action.parameters and reset_action.parameters["request_trajectory"]:
            output_message_dict["message"]["last_trajectory"] = self._agent_trajectories[agent_addr]
        self._agent_trajectories[agent_addr] = self._reset_trajectory(agent_addr)
    response_msg_json = convert_msg_dict_to_json(output_message_dict)
    await self._agent_response_queues[agent_addr].put(response_msg_json)

_remove_agent_from_game async

Removes a player from the GameCoordinator's tracking and returns their final info.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: A dictionary containing final agent statistics and state.

Source code in netsecgame/game/coordinator.py
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
async def _remove_agent_from_game(self, agent_addr: tuple) -> Dict[str, Any]:
    """
    Removes a player from the GameCoordinator's tracking and returns their final info.

    Args:
        agent_addr (tuple): The address of the agent.

    Returns:
        Dict[str, Any]: A dictionary containing final agent statistics and state.
    """
    self.logger.info(f"Removing player {agent_addr} from the GameCoordinator")
    agent_info = {}
    async with self._agents_lock:
        if agent_addr in self.agents:
            agent_info["state"] = self._agent_states.pop(agent_addr)
            agent_info["goal_state"] = self._agent_goal_states.pop(agent_addr)
            agent_info["num_steps"] = self._agent_steps.pop(agent_addr)
            agent_info["agent_status"] = self._agent_status.pop(agent_addr)
            agent_info["false_positives"] = self._agent_false_positives.pop(agent_addr)
            async with self._reset_lock:
                # remove agent from  topology reset requests
                agent_info["topology_reset_request"] = self._randomize_topology_requests.pop(agent_addr, False)
                # remove agent from reset requests
                agent_info["reset_request"] = self._reset_requests.pop(agent_addr)
                agent_info["reset_seed"] = self._reset_seed_requests.pop(agent_addr, None)
                # check if this agent was not preventing reset 
                if all(self._reset_requests.values()):
                    if len(self.agents) > 0:
                        self._reset_event.set()
                agent_info["episode_end"] = self._episode_ends.pop(agent_addr)
                #check if this agent was not preventing episode end
                if all(self._episode_ends.values()):
                    if len(self.agents) > 0:
                        self._episode_end_event.set()
            agent_info["end_reward"] = self._agent_rewards.pop(agent_addr, None)
            agent_info["agent_info"] = self.agents.pop(agent_addr)
            self.logger.debug(f"\t{agent_info}")
            # clear the sufficient number of players event
            self._episode_start_event.clear()
        else:
            self.logger.info(f"\t Player {agent_addr} not present in the game!")
        return agent_info

_reset_game async

Task that waits for all agents to request resets and coordinates the process.

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
async def _reset_game(self) -> None:
    """
    Task that waits for all agents to request resets and coordinates the process.

    Returns:
        None
    """
    self.logger.debug("Starting task for game reset handelling.")
    while not self.shutdown_flag.is_set():
        # wait until episode is finished by all agents
        done, pending = await asyncio.wait(
           [asyncio.create_task(self._reset_event.wait()), 
            asyncio.create_task(self.shutdown_flag.wait())],
            return_when=asyncio.FIRST_COMPLETED,
        )
         # Check if shutdown_flag was set
        if self.shutdown_flag.is_set():
            self.logger.debug("\tExiting reset_game task.")
            break
        if len(self.agents) > 0: 
            # verify that all agents agreed on the seed (or sent None)
            valid_seeding = False
            valid_topology_change = False
            non_none_seeds = [seed for seed in self._reset_seed_requests.values() if seed is not None]
            if len(non_none_seeds) == 0: # no agent wants to change the seed
                seed = None
                valid_seeding = True
            elif len(set(non_none_seeds)) == 1: # all agents agree on the seed
                seed = non_none_seeds[0]
                valid_seeding = True
            else: # agents disagree on the seed
                seed = None
            # verify that all agents agreed on the topology change (or sent None)
            valid_seed_agents = [agent for agent in self.agents if self._reset_seed_requests[agent] is not None]
            valid_topology_requests = [self._randomize_topology_requests[agent] for agent in valid_seed_agents]
            if len(set(valid_topology_requests)) == 1: # all valid agents agree on the topology change
                valid_topology_change = True
                topology_change = valid_topology_requests[0]
            else: # agents disagree on the topology change
                valid_topology_change = False
                topology_change = None

            if valid_seeding and valid_topology_change:
                await self._handle_valid_reset(seed, topology_change)
                self._reset_event.clear()  
                # notify all waiting agents
                async with self._reset_done_condition:
                    self._reset_done_condition.notify_all()
            elif not valid_seeding:
                await self._handle_invalid_reset("Agents disagree on the seed. Undefined state. Stopping the game")
                self._reset_event.clear()  
            elif not valid_topology_change:
                await self._handle_invalid_reset("Agents disagree on the topology change. Undefined state. Stopping the game")
                self._reset_event.clear()  
        self._reset_event.clear()  
        # notify all waiting agents
        async with self._reset_done_condition:
            self._reset_done_condition.notify_all()
    self.logger.info("\tReset game task stopped.")

_reset_trajectory

Resets and initializes a new trajectory dictionary for an agent.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The initial trajectory dictionary.

Source code in netsecgame/game/coordinator.py
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
def _reset_trajectory(self, agent_addr: tuple) -> Dict[str, Any]:
    """
    Resets and initializes a new trajectory dictionary for an agent.

    Args:
        agent_addr (tuple): The address of the agent.

    Returns:
        Dict[str, Any]: The initial trajectory dictionary.
    """
    agent_name, agent_role = self.agents[agent_addr]
    self.logger.debug(f"Resetting trajectory of {agent_addr}")
    return {
            "trajectory":{
                "states":[self._agent_states[agent_addr].as_dict],
                "actions":[],
                "rewards":[],
            },
            "end_reason":None,
            "agent_role":agent_role,
            "agent_name":agent_name
        }

_respond_on_bad_request async

Sends a response to the agent indicating that the request was bad.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required
message str

The descriptive error message.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
async def _respond_on_bad_request(self, agent_addr: tuple, message: str) -> None:
    """
    Sends a response to the agent indicating that the request was bad.

    Args:
        agent_addr (tuple): The address of the agent.
        message (str): The descriptive error message.

    Returns:
        None
    """
    output_message_dict = {
        "to_agent": agent_addr,
        "status": str(GameStatus.BAD_REQUEST),
        "observation": None,
        "message": {
            "message": f"Bad request received: {message}",
            }
    }
    await self._agent_response_queues[agent_addr].put(convert_msg_dict_to_json(output_message_dict))

_spawn_task

Helper function to make sure all tasks are registered for proper termination.

Parameters:

Name Type Description Default
coroutine Coroutine

The coroutine function to schedule.

required
*args tuple

Positional arguments to pass to the coroutine.

()
**kwargs dict

Keyword arguments to pass to the coroutine.

{}

Returns:

Type Description
Task

asyncio.Task: The created task object.

Source code in netsecgame/game/coordinator.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def _spawn_task(self, coroutine:Coroutine, *args:tuple, **kwargs:dict)->asyncio.Task:
    """
    Helper function to make sure all tasks are registered for proper termination.

    Args:
        coroutine: The coroutine function to schedule.
        *args: Positional arguments to pass to the coroutine.
        **kwargs: Keyword arguments to pass to the coroutine.

    Returns:
        asyncio.Task: The created task object.
    """
    task = asyncio.create_task(coroutine(*args, **kwargs))
    self._tasks.add(task)
    def remove_task(t):
        self._tasks.discard(t)
    task.add_done_callback(remove_task)  # Remove task when done
    return task

_store_trajectory_to_file

Stores the collected trajectory for an agent to a JSONL file.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required
location str

The directory where the file should be saved.

'./logs/trajectories'

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
def _store_trajectory_to_file(self, agent_addr: tuple, location: str = "./logs/trajectories") -> None:
    """
    Stores the collected trajectory for an agent to a JSONL file.

    Args:
        agent_addr (tuple): The address of the agent.
        location (str): The directory where the file should be saved.

    Returns:
        None
    """
    if agent_addr in self.agents:
        agent_name, agent_role = self.agents[agent_addr]
        filename =f"{datetime.now():%Y-%m-%d}_{agent_name}_{agent_role}"
        trajectories = self._agent_trajectories[agent_addr]
        store_trajectories_to_jsonl(trajectories, location, filename)
        self.logger.info(f"Trajectories of {agent_addr} strored in {os.path.join(location, filename)}.jsonl")
    else:
        self.logger.warning(f"Agent {agent_addr} not found in agents list, can't store trajectory to file.")

_update_agent_episode_end

Update the episode end status of an agent. Args: agent (tuple): The agent to update the episode end status of.

Returns:

Name Type Description
bool bool

True if the episode has ended, False otherwise.

Source code in netsecgame/game/coordinator.py
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
def _update_agent_episode_end(self, agent:tuple)->bool:
    """
    Update the episode end status of an agent.
    Args:
        agent (tuple): The agent to update the episode end status of.

    Returns:
        bool: True if the episode has ended, False otherwise.
    """
    episode_end = False
    if  self._agent_status[agent] in [AgentStatus.Success, AgentStatus.Fail, AgentStatus.TimeoutReached]:
        # agent reached goal, timeout or was detected
        episode_end = True
    # check if there are any agents playing with timeout
    elif all(
            status != AgentStatus.PlayingWithTimeout
            for status in self._agent_status.values()
        ):
        # all attackers have finised - terminate episode
        self.logger.info(f"Stopping episode for {agent} because the is no ACTIVE agent playing.")
        episode_end = True
    return episode_end

_update_agent_status

Update the status of an agent based on reaching the goal, timeout or detection. Args: agent (tuple): The agent to update the status of.

Returns:

Name Type Description
AgentStatus AgentStatus

The new status of the agent.

Source code in netsecgame/game/coordinator.py
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
def _update_agent_status(self, agent:tuple)->AgentStatus:
    """
    Update the status of an agent based on reaching the goal, timeout or detection.
    Args:
        agent (tuple): The agent to update the status of.

    Returns:
        AgentStatus: The new status of the agent.
    """
    # read current status of the agent
    next_status = self._agent_status[agent]
    if self.goal_check(agent):
        # Goal has been reached
        self.logger.info(f"Agent {agent}{self.agents[agent]} reached the goal!")
        next_status = AgentStatus.Success
    elif self.is_detected(agent):
        # Detection by Global Defender
        self.logger.info(f"Agent {agent}{self.agents[agent]} detected by GlobalDefender!")
        next_status = AgentStatus.Fail
    elif self.is_timeout(agent):
        # Timout Reached
        self.logger.info(f"Agent {agent}{self.agents[agent]} reached timeout ({self._agent_steps[agent]} steps).")
        next_status = AgentStatus.TimeoutReached
    return next_status

add_false_positive

Method for adding false positive to the agent. Args: agent (tuple): The agent to add false positive to.

Source code in netsecgame/game/coordinator.py
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
def add_false_positive(self, agent:tuple)->None:
    """
    Method for adding false positive to the agent.
    Args:
        agent (tuple): The agent to add false positive to.
    """
    self.logger.debug(f"Adding false positive to {agent}")
    if agent in self._agent_false_positives:
        self._agent_false_positives[agent] += 1
    else:
        self._agent_false_positives[agent] = 1
    self.logger.debug(f"False positives for {agent}: {self._agent_false_positives[agent]}")

create_agent_queue async

Creates a queue for the given agent address if it doesn't already exist.

Parameters:

Name Type Description Default
agent_addr tuple

The agent address to create a queue for.

required
Source code in netsecgame/game/coordinator.py
162
163
164
165
166
167
168
169
170
171
async def create_agent_queue(self, agent_addr:tuple)->None:
    """
    Creates a queue for the given agent address if it doesn't already exist.

    Args:
        agent_addr (tuple): The agent address to create a queue for.
    """
    if agent_addr not in self._agent_response_queues:
        self._agent_response_queues[agent_addr] = asyncio.Queue()
        self.logger.info(f"Created queue for agent {agent_addr}. {len(self._agent_response_queues)} queues in total.")

goal_check

Checks if the goal conditions for specific agent were satisfied.

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required

Returns:

Name Type Description
bool bool

True if the goal is reached, False otherwise.

Source code in netsecgame/game/coordinator.py
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
def goal_check(self, agent_addr: tuple) -> bool:
    """
    Checks if the goal conditions for specific agent were satisfied.

    Args:
        agent_addr (tuple): The address of the agent.

    Returns:
        bool: True if the goal is reached, False otherwise.
    """
    def goal_dict_satistfied(goal_dict: Dict[Any, Set], known_dict: Dict[Any, Set]) -> bool:
        """
        Helper function for checking if a goal dictionary condition is satisfied.

        Args:
            goal_dict (Dict[Any, Set]): The target dictionary (IP -> set of values).
            known_dict (Dict[Any, Set]): The agent's currently known values.

        Returns:
            bool: True if known_dict satisfies the goal_dict.
        """
        # check if we have all IPs that should have some values (are keys in goal_dict)
        if goal_dict.keys() <= known_dict.keys():
            try:
                # Check if values (sets) for EACH key (host) in goal_dict are subsets of known_dict, keep matching_keys
                matching_keys = [host for host in goal_dict.keys() if goal_dict[host]<= known_dict[host]]
                # Check we have the amount of mathing keys as in the goal_dict
                if len(matching_keys) == len(goal_dict.keys()):
                    return True
            except KeyError:
                # some keys are missing in the known_dict
                return False
        return False
    self.logger.debug(f"Checking goal for agent {agent_addr}.")
    state = self._agent_states[agent_addr]
    # For each part of the state of the game, check if the conditions are met
    target_goal_state = self._agent_goal_states[agent_addr]
    self.logger.debug(f"\tGoal conditions: {target_goal_state}.")
    goal_reached = {}    
    goal_reached["networks"] = target_goal_state.known_networks <= state.known_networks
    goal_reached["known_hosts"] = target_goal_state.known_hosts <= state.known_hosts
    goal_reached["controlled_hosts"] = target_goal_state.controlled_hosts <= state.controlled_hosts
    goal_reached["services"] = goal_dict_satistfied(target_goal_state.known_services, state.known_services)
    goal_reached["data"] = goal_dict_satistfied(target_goal_state.known_data, state.known_data)
    goal_reached["known_blocks"] = goal_dict_satistfied(target_goal_state.known_blocks, state.known_blocks)
    self.logger.debug(f"\t{goal_reached}")
    return all(goal_reached.values())

is_agent_benign

Checks if the agent has a benign role (Defender or Benign).

Parameters:

Name Type Description Default
agent_addr tuple

The address of the agent.

required

Returns:

Name Type Description
bool bool

True if the agent is benign, False otherwise.

Source code in netsecgame/game/coordinator.py
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
def is_agent_benign(self, agent_addr: tuple) -> bool:
    """
    Checks if the agent has a benign role (Defender or Benign).

    Args:
        agent_addr (tuple): The address of the agent.

    Returns:
        bool: True if the agent is benign, False otherwise.
    """
    if agent_addr not in self.agents:
        return False
    #TODO: change to use AgentRole
    return self.agents[agent_addr][1].lower() in ["defender", "benign"]

is_detected

Checks if the agent's last action was detected by the global defender.

Parameters:

Name Type Description Default
agent tuple

The address of the agent.

required

Returns:

Name Type Description
bool bool

True if detected, False otherwise.

Source code in netsecgame/game/coordinator.py
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
def is_detected(self, agent: tuple) -> bool:
    """
    Checks if the agent's last action was detected by the global defender.

    Args:
        agent (tuple): The address of the agent.

    Returns:
        bool: True if detected, False otherwise.
    """
    if self._global_defender:
        detection = self._global_defender.stochastic_with_threshold(self._agent_last_action[agent], self._agent_trajectories[agent]["trajectory"]["actions"])
        self.logger.debug(f"Global Detection result: {detection}")
        return detection
    else:
        # No global defender
        return False

is_timeout

Checks if the agent has reached its maximum step limit.

Parameters:

Name Type Description Default
agent tuple

The address of the agent.

required

Returns:

Name Type Description
bool bool

True if timeout reached, False otherwise.

Source code in netsecgame/game/coordinator.py
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
def is_timeout(self, agent: tuple) -> bool:
    """
    Checks if the agent has reached its maximum step limit.

    Args:
        agent (tuple): The address of the agent.

    Returns:
        bool: True if timeout reached, False otherwise.
    """
    timeout_reached = False
    if self._steps_limit_per_role[self.agents[agent][1]]:
        if self._agent_steps[agent] >= self._steps_limit_per_role[self.agents[agent][1]]:
            timeout_reached = True
    return timeout_reached

register_agent async

Domain-specific method to register an agent and create its initial and goal states.

Parameters:

Name Type Description Default
agent_id tuple

The identifier for the agent.

required
agent_role AgentRole

The role of the agent.

required
agent_initial_view Dict[str, Any]

The initial starting view for the agent.

required
agent_win_condition_view Dict[str, Any]

The win conditions for the agent.

required

Returns:

Type Description
Tuple[GameState, GameState]

Tuple[GameState, GameState]: A tuple containing (initial_state, goal_state).

Source code in netsecgame/game/coordinator.py
803
804
805
806
807
808
809
810
811
812
813
814
815
816
async def register_agent(self, agent_id: tuple, agent_role: AgentRole, agent_initial_view: Dict[str, Any], agent_win_condition_view: Dict[str, Any]) -> Tuple[GameState, GameState]:
    """
    Domain-specific method to register an agent and create its initial and goal states.

    Args:
        agent_id (tuple): The identifier for the agent.
        agent_role (AgentRole): The role of the agent.
        agent_initial_view (Dict[str, Any]): The initial starting view for the agent.
        agent_win_condition_view (Dict[str, Any]): The win conditions for the agent.

    Returns:
        Tuple[GameState, GameState]: A tuple containing (initial_state, goal_state).
    """
    raise NotImplementedError

remove_agent async

Domain-specific method to remove an agent from the environment.

Parameters:

Name Type Description Default
agent_id tuple

The identifier for the agent.

required
agent_state GameState

The last known state of the agent.

required

Returns:

Name Type Description
bool bool

True if removal was successful, False otherwise.

Source code in netsecgame/game/coordinator.py
818
819
820
821
822
823
824
825
826
827
828
829
async def remove_agent(self, agent_id: tuple, agent_state: GameState) -> bool:
    """
    Domain-specific method to remove an agent from the environment.

    Args:
        agent_id (tuple): The identifier for the agent.
        agent_state (GameState): The last known state of the agent.

    Returns:
        bool: True if removal was successful, False otherwise.
    """
    raise NotImplementedError

reset async

Domain specific method of the environment. Creates the initial state of the agent. Must be implemented by the domain specific environment.

Parameters:

Name Type Description Default
seed int

Seed for the random number generator. Defaults to None.

None
Source code in netsecgame/game/coordinator.py
903
904
905
906
907
908
909
910
911
async def reset(self, seed:Optional[int]=None)->bool:
    """
    Domain specific method of the environment. Creates the initial state of the agent.
    Must be implemented by the domain specific environment.

    Args:
        seed (int, optional): Seed for the random number generator. Defaults to None.
    """
    raise NotImplementedError

reset_agent async

Domain-specific method to reset an agent's state for a new episode.

Parameters:

Name Type Description Default
agent_id tuple

The identifier for the agent.

required
agent_role AgentRole

The role of the agent.

required
agent_initial_view Dict[str, Any]

The new starting view for the agent.

required
agent_win_condition_view Dict[str, Any]

The win conditions for the agent.

required

Returns:

Type Description
Tuple[GameState, GameState]

Tuple[GameState, GameState]: A tuple containing (new_state, new_goal_state).

Source code in netsecgame/game/coordinator.py
831
832
833
834
835
836
837
838
839
840
841
842
843
844
async def reset_agent(self, agent_id: tuple, agent_role: AgentRole, agent_initial_view: Dict[str, Any], agent_win_condition_view: Dict[str, Any]) -> Tuple[GameState, GameState]:
    """
    Domain-specific method to reset an agent's state for a new episode.

    Args:
        agent_id (tuple): The identifier for the agent.
        agent_role (AgentRole): The role of the agent.
        agent_initial_view (Dict[str, Any]): The new starting view for the agent.
        agent_win_condition_view (Dict[str, Any]): The win conditions for the agent.

    Returns:
        Tuple[GameState, GameState]: A tuple containing (new_state, new_goal_state).
    """
    raise NotImplementedError

run

Wrapper for asyncio run function. Starts all tasks in AIDojo.

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
173
174
175
176
177
178
179
180
181
182
183
184
185
def run(self) -> None:
    """
    Wrapper for asyncio run function. Starts all tasks in AIDojo.

    Returns:
        None
    """
    try:
        asyncio.run(self.start_tasks())
    except Exception as e:
        self.logger.error(f"Unexpected error: {e}")
    finally:
        self.logger.info(f"{__class__.__name__} has exited.")

run_game async

Main game loop task.

Responsible for reading messages from the agent queue, parsing them using _parse_action_message, and dispatching them to the appropriate handler using _dispatch_action.

Source code in netsecgame/game/coordinator.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
async def run_game(self):
    """
    Main game loop task. 

    Responsible for reading messages from the agent queue, parsing them using `_parse_action_message`, 
    and dispatching them to the appropriate handler using `_dispatch_action`.
    """
    while not self.shutdown_flag.is_set():
        # Read message from the queue
        agent_addr, message = await self._agent_action_queue.get()
        if message is not None:
            self.logger.info(f"Coordinator received from agent {agent_addr}: {message}.")

            action = self._parse_action_message(agent_addr, message)
            if action is not None:
                self._dispatch_action(agent_addr, action)
            else:
                self._spawn_task(self._respond_on_bad_request, agent_addr, "Malformed Action")
    self.logger.info("\tAction processing task stopped.")

shutdown_signal_handler async

Logs the signal reception and sets the shutdown flag to initiate graceful termination.

Source code in netsecgame/game/coordinator.py
155
156
157
158
159
160
async def shutdown_signal_handler(self)->None:
    """
    Logs the signal reception and sets the shutdown flag to initiate graceful termination.
    """
    self.logger.info("Shutdown signal received. Setting shutdown flag.")
    self.shutdown_flag.set()

start_tasks async

High level function to start all asynchronous tasks.

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
async def start_tasks(self) -> None:
    """
    High level function to start all asynchronous tasks.

    Returns:
        None
    """
    loop = asyncio.get_running_loop()

    # Set up signal handlers for graceful shutdown
    loop.add_signal_handler(
        signal.SIGINT, lambda: asyncio.create_task(self.shutdown_signal_handler())
    )
    loop.add_signal_handler(
        signal.SIGTERM, lambda: asyncio.create_task(self.shutdown_signal_handler())
    )


    # Initialize configuration manager and load the configuration
    await self.config_manager.load()
    self._cyst_objects = self.config_manager.get_cyst_objects()

    if self.config_manager.get_config_hash():
         self._CONFIG_FILE_HASH = self.config_manager.get_config_hash()

    # Read configuration
    self._starting_positions_per_role = self.config_manager.get_all_starting_positions()
    self._win_conditions_per_role = self.config_manager.get_all_win_conditions()
    self._goal_description_per_role = self.config_manager.get_all_goal_descriptions()
    self._steps_limit_per_role = self.config_manager.get_all_max_steps()

    self.logger.debug(f"Timeouts set to:{self._steps_limit_per_role}")
    if self.config_manager.get_use_global_defender():
        self._global_defender = GlobalDefender()
    else:
        self._global_defender = None
    self._use_dynamic_addresses = self.config_manager.get_use_dynamic_addresses()
    self.logger.info(f"Change IP every episode set to: {self._use_dynamic_addresses}")
    self._rewards = self.config_manager.get_rewards(["step", "success", "fail", "false_positive"])
    self.logger.info(f"Rewards set to:{self._rewards}")
    self._min_required_players = self.config_manager.get_required_num_players()
    self.logger.info(f"Min player requirement set to:{self._min_required_players}")
    # run self initialization
    self._initialize()

    # start server for agent communication
    self._spawn_task(self.start_tcp_server)

    # start episode rewards task
    self._spawn_task(self._assign_rewards_episode_end)

    # start episode rewards task
    self._spawn_task(self._reset_game)

    # start action processing task
    self._spawn_task(self.run_game)

    while not self.shutdown_flag.is_set():
        # just wait until user terminates
        await asyncio.sleep(1)
    self.logger.debug("Final cleanup started")
    # make sure there are no running tasks left
    for task in self._tasks:
        task.cancel()  # Cancel each active task
    await asyncio.gather(*self._tasks, return_exceptions=True)  # Wait for all tasks to finish
    self.logger.info("All tasks shut down.")

start_tcp_server async

Starts the TCP server for agent communication.

Returns:

Type Description
None

None

Source code in netsecgame/game/coordinator.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
async def start_tcp_server(self) -> None:
    """
    Starts the TCP server for agent communication.

    Returns:
        None
    """
    server = None
    try:
        self.logger.info("Starting the server listening for agents")
        server = await asyncio.start_server(
            AgentServer(
                self._agent_action_queue,
                self._agent_response_queues,
                max_connections=self._min_required_players
            ),
            self.host,
            self.port
        )
        addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)
        self.logger.info(f"\tServing on {addrs}")
        while not self.shutdown_flag.is_set():
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        self.logger.debug("\tStopping TCP server task.")
    except Exception as e:
        self.logger.error(f"TCP server failed: {e}")
    finally:
        if server:
            server.close()
            await server.wait_closed()
        self.logger.info("\tTCP server task stopped")

step async

Domain-specific method to perform an action in the environment.

Parameters:

Name Type Description Default
agent_id tuple

The identifier for the agent.

required
agent_state GameState

The current state of the agent.

required
action Action

The action to perform.

required

Returns:

Name Type Description
GameState GameState

The new state of the agent after the action.

Source code in netsecgame/game/coordinator.py
889
890
891
892
893
894
895
896
897
898
899
900
901
async def step(self, agent_id: tuple, agent_state: GameState, action: Action) -> GameState:
    """
    Domain-specific method to perform an action in the environment.

    Args:
        agent_id (tuple): The identifier for the agent.
        agent_state (GameState): The current state of the agent.
        action (Action): The action to perform.

    Returns:
        GameState: The new state of the agent after the action.
    """
    raise NotImplementedError