Skip to content

NetSecGame

NetSecGame is an extension of the GameCoordinator that implements the specific dynamics of the simulation while retaining the full functionality of the core game coordinator.

netsecgame.game.worlds.NetSecGame.NetSecGame

Bases: GameCoordinator

Initializes the NetSecGame world.

Parameters:

Name Type Description Default
game_host str

The host for the game server.

required
game_port int

The port for the game server.

required
task_config str

Path to the task configuration file.

required
seed Optional[int]

Random seed for reproducibility.

None
Source code in netsecgame/game/worlds/NetSecGame.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(self, game_host: str, game_port: int, task_config: str, seed: Optional[int] = None):
    """
    Initializes the NetSecGame world.

    Args:
        game_host (str): The host for the game server.
        game_port (int): The port for the game server.
        task_config (str): Path to the task configuration file.
        seed (Optional[int]): Random seed for reproducibility.
    """
    super().__init__(game_host, game_port, service_host=None, service_port=None, task_config_file=task_config)

    # Internal data structure of the NSG
    self._ip_to_hostname = {} # Mapping of `IP`:`host_name`(str) of all nodes in the environment
    self._networks = {} # A `dict` of the networks present in the environment. Keys: `Network` objects, values `set` of `IP` objects.
    self._services = {} # Dict of all services in the environment. Keys: hostname (`str`), values: `set` of `Service` objetcs.
    self._data_content = {}
    self._data = {}
    self._firewall = {} # dict of all the allowed connections in the environment. Keys `IP` ,values: `set` of `IP` objects.
    self._fw_blocks = {}
    self._agent_fw_rules = {}
    # All exploits in the environment
    self._exploits = {}
    # A list of all the hosts where the attacker can start in a random start
    self.hosts_to_start = []
    self._network_mapping = {}
    self._ip_mapping = {}

    # Set the random seed
    self._set_random_seed(seed)

_create_goal_state_from_view

Builds a GameState from given view (dict). All keywords are replaced by valid options. Args: view (Dict[str, Any]): The view containing goal state information. allowed_hosts (Optional[Set[IP]]): A set of allowed hosts for random selection.

Returns:

Name Type Description
GameState GameState

The generated goal state.

Source code in netsecgame/game/worlds/NetSecGame.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def _create_goal_state_from_view(self, view:dict, allowed_hosts=None)->GameState:
    """
    Builds a GameState from given view (dict). All keywords are replaced by valid options.
    Args:
        view (Dict[str, Any]): The view containing goal state information.
        allowed_hosts (Optional[Set[IP]]): A set of allowed hosts for random selection.

    Returns:
        GameState: The generated goal state.
    """
    self.logger.info(f'Generating goal state from view:{view}')
    # process known networks
    known_networks = self._get_networks_from_view(view_known_networks=view["known_networks"])
    # parse controlled hosts, replacing keywords if present
    controlled_hosts = self._get_hosts_from_view(view_hosts=view["controlled_hosts"], allowed_hosts=allowed_hosts)
    # parse known hosts
    known_hosts = self._get_hosts_from_view(view_hosts=view["known_hosts"])
    # parse known services
    known_services = self._get_services_from_view(view["known_services"])
    # parse known data
    known_data = self._get_data_from_view(view["known_data"], keyword_scope="global", exclude_types=["logs"])
    goal_state = GameState(controlled_hosts, known_hosts, known_services, known_data, known_networks)
    self.logger.info(f"Generated Goal GameState:{goal_state}")
    return goal_state

_create_new_network_mapping

Generates new network addresses (preserving relative distance between networks) and maps host IPs by preserving their relative offset within the subnet.

Parameters:

Name Type Description Default
max_attempts int

Maximum number of mapping attempts.

10
seed Optional[int]

Random seed.

None

Returns:

Type Description
Tuple[Dict[Network, Network], Dict[IP, IP]]

Tuple[Dict[Network, Network], Dict[IP, IP]]: The network and IP mappings.

Source code in netsecgame/game/worlds/NetSecGame.py
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
def _create_new_network_mapping(self, max_attempts: int = 10, seed: Optional[int] = None) -> Tuple[Dict[Network, Network], Dict[IP, IP]]:
    """
    Generates new network addresses (preserving relative distance between networks)
    and maps host IPs by preserving their relative offset within the subnet.

    Args:
        max_attempts (int): Maximum number of mapping attempts.
        seed (Optional[int]): Random seed.

    Returns:
        Tuple[Dict[Network, Network], Dict[IP, IP]]: The network and IP mappings.
    """
    #self.logger.info(f"Generating new network and IP address mapping with seed {seed} (max attempts: {max_attempts})")

    # # setup random generators
    # if seed is not None:
    #     fake = Faker()
    #     fake.seed_instance(seed)
    #     rng = random.Random(seed)
    # else:
    #     fake = self._faker_object
    #     rng = random
    fake = self._faker_object
    rng = random


    mapping_nets = {}
    mapping_ips = {}

    # sort networks for deterministic processing (order should be deterministic in Python 3.7+ but we enforce it)
    sorted_networks = sorted(self._networks.keys(), key=str)

    # generate network mappings (Preserves distance between private networks)
    private_nets = []
    for net in sorted_networks:
        if netaddr.IPNetwork(str(net)).ip.is_private():
            private_nets.append(net)
        else:
            mapping_nets[net] = Network(fake.ipv4_public(), net.mask)

    # Private Network logic
    valid_network_mapping = False
    counter_iter = 0

    while not valid_network_mapping:
        try:
            # Pick a random start for the first private network
            new_base = netaddr.IPNetwork(f"{fake.ipv4_private()}/{private_nets[0].mask}")
            mapping_nets[private_nets[0]] = Network(str(new_base.network), private_nets[0].mask)

            base_orig = netaddr.IPNetwork(str(private_nets[0]))
            checks = []

            for i in range(1, len(private_nets)):
                current_orig = netaddr.IPNetwork(str(private_nets[i]))
                # Calculate distance between Network A and Network B
                diff = current_orig.ip - base_orig.ip

                # Apply distance to new base
                new_net_ip = netaddr.IPNetwork(str(mapping_nets[private_nets[0]])).ip + diff

                checks.append(new_net_ip.is_private())
                mapping_nets[private_nets[i]] = Network(str(new_net_ip), private_nets[i].mask)

            if all(checks): 
                valid_network_mapping = True
        except IndexError:
            counter_iter += 1
            if counter_iter > max_attempts:
                self.logger.error(f"Failed to generate valid network mapping in {max_attempts} attempts - exiting.")
                exit(-1)

    self.logger.info(f"New network mapping: {mapping_nets}")

    # 4. MAP IPS (Preserves distance/offset within subnet)
    for net in sorted_networks:
        if net not in mapping_nets: continue

        orig_net_obj = netaddr.IPNetwork(str(net))
        new_net_obj = netaddr.IPNetwork(str(mapping_nets[net]))

        # Prepare fallback pool (deterministic shuffle) just in case an offset fails
        # We exclude .0 and .255 explicitly from the list
        fallback_pool = list(new_net_obj)[1:-1]
        rng.shuffle(fallback_pool)

        # Sort hosts for deterministic processing order
        hosts = self._networks[net]
        sorted_hosts = sorted(hosts, key=lambda x: repr(x))

        for host in sorted_hosts:
            try:
                old_host_ip = netaddr.IPAddress(str(host))

                # Calculate Offset: (Host IP) - (Network Address)
                # e.g. 192.168.1.55 - 192.168.1.0 = 55
                offset = old_host_ip - orig_net_obj.network

                # Apply Offset to New Network
                # e.g. 10.0.0.0 + 55 = 10.0.0.55
                new_host_ip = new_net_obj.network + offset

                # Verify validity:
                # 1. Must be inside the new subnet (cidr check)
                # 2. Must not be the Network Address (.0) or Broadcast (.255)
                if (new_host_ip in new_net_obj and 
                    new_host_ip != new_net_obj.network and 
                    new_host_ip != new_net_obj.broadcast):

                    mapping_ips[host] = IP(str(new_host_ip))

                    # Optimization: If this IP happens to be in our fallback pool,
                    # remove it so fallback logic doesn't re-assign it later.
                    # (Checking efficient sets is faster, but list remove is safe here for small subnets)
                    if new_host_ip in fallback_pool:
                        fallback_pool.remove(new_host_ip)
                else:
                    raise ValueError("Offset calculated invalid IP")

            except (ValueError, TypeError, netaddr.AddrFormatError):
                # Fallback Strategy: Assign next available random IP from the pool
                # This handles edge cases or weird topology mismatches gracefully
                if fallback_pool:
                    safe_ip = fallback_pool.pop(0) # Take first available from shuffled pool
                    mapping_ips[host] = IP(str(safe_ip))
                    self.logger.warning(f"Offset failed for {host}, assigned fallback {safe_ip}")
                else:
                    self.logger.error(f"Subnet exhausted for {net}")

    # Static mappings
    mapping_ips['random'] = 'random'
    mapping_ips['all_local'] = 'all_local'
    mapping_ips['all_attackers'] = 'all_attackers'

    self.logger.info(f"Mapping IPs done: {mapping_ips}")
    return mapping_nets, mapping_ips

_create_state_from_view

Builds a GameState from given view. If there is a keyword 'random' used, it is replaced by a valid option at random.

Parameters:

Name Type Description Default
view Dict[str, Any]

The view containing state information.

required
add_neighboring_nets bool

Whether to add neighboring networks.

True

Returns:

Name Type Description
GameState GameState

The generated state.

Source code in netsecgame/game/worlds/NetSecGame.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def _create_state_from_view(self, view:dict, add_neighboring_nets:bool=True)->GameState:
    """
    Builds a GameState from given view.
    If there is a keyword 'random' used, it is replaced by a valid option at random.

    Args:
        view (Dict[str, Any]): The view containing state information.
        add_neighboring_nets (bool): Whether to add neighboring networks.

    Returns:
        GameState: The generated state.
    """
    self.logger.info(f'Generating state from view:{view}')
    # re-map all networks based on current mapping in self._network_mapping
    known_networks = self._get_networks_from_view(view_known_networks=view["known_networks"])
    # parse controlled hosts
    controlled_hosts = self._get_hosts_from_view(view_hosts=view["controlled_hosts"], allowed_hosts=self.hosts_to_start)
    known_hosts = self._get_hosts_from_view(view_hosts=view["known_hosts"], allowed_hosts=self.hosts_to_start)
    # Add all controlled hosts to known_hosts
    known_hosts = known_hosts.union(controlled_hosts)
    if add_neighboring_nets:
        # Extend the known networks with the neighbouring networks
        # This is to solve in the env (and not in the agent) the problem
        # of not knowing other networks appart from the one the agent is in
        # This is wrong and should be done by the agent, not here
        # TODO remove this!
        for controlled_host in controlled_hosts:
            for net in self._get_networks_from_host(controlled_host): #TODO
                net_obj = netaddr.IPNetwork(str(net))
                known_networks.add(net)
                if net_obj.ip.is_private(): #TODO
                    net_obj.value += 256
                    if net_obj.ip.is_private():
                        ip = Network(str(net_obj.ip), net_obj.prefixlen)
                        self.logger.debug(f'\tAdding {ip} to agent')
                        known_networks.add(ip)
                    net_obj.value -= 2*256
                    if net_obj.ip.is_private():
                        ip = Network(str(net_obj.ip), net_obj.prefixlen)
                        self.logger.debug(f'\tAdding {ip} to agent')
                        known_networks.add(ip)
                    #return value back to the original
                    net_obj.value += 256
    else:
        for controlled_host in controlled_hosts:
            for net in self._get_networks_from_host(controlled_host): #TODO
                known_networks.add(net)
    # parse known services
    known_services = self._get_services_from_view(view["known_services"])
    # parse known data
    known_data = self._get_data_from_view(view["known_data"])
    game_state = GameState(controlled_hosts, known_hosts, known_services, known_data, known_networks)
    self.logger.info(f"Generated GameState:{game_state}")
    return game_state

_dynamic_ip_change

Changes the IP and network addresses in the environment Args: max_attempts (int): Maximum number of attempts to find a valid mapping. seed (Optional[int]): Seed for random number generator.

Returns:

Type Description
None

None

Source code in netsecgame/game/worlds/NetSecGame.py
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
def _dynamic_ip_change(self, max_attempts:int=10, seed=None)-> None:
    """
    Changes the IP and network addresses in the environment
    Args:
        max_attempts (int): Maximum number of attempts to find a valid mapping.
        seed (Optional[int]): Seed for random number generator.

    Returns:
        None
    """
    self.logger.info("Changing IP and Network addresses in the environment")
    # find a new IP and network mapping 
    mapping_nets, mapping_ips = self._create_new_network_mapping(max_attempts, seed=seed)

    # update ALL data structure in the environment with the new mappings

    # self._networks
    new_self_networks = {}
    for net, ips in self._networks.items():
        new_self_networks[mapping_nets[net]] = set()
        for ip in ips:
            new_self_networks[mapping_nets[net]].add(mapping_ips[ip])
    self._networks = new_self_networks

    #self._firewall_original (we do not care about the changes done during the episode)
    new_self_firewall_original = {}
    for ip, dst_ips in self._firewall_original.items():
        new_self_firewall_original[mapping_ips[ip]] = set()
        for dst_ip in dst_ips:
            new_self_firewall_original[mapping_ips[ip]].add(mapping_ips[dst_ip])
    self.logger.debug(f"New FW: {new_self_firewall_original}")
    self._firewall_original = new_self_firewall_original

    # self._ip_to_hostname
    new_self_ip_to_hostname  = {}
    for ip, hostname in self._ip_to_hostname.items():
        new_self_ip_to_hostname[mapping_ips[ip]] = hostname
    self._ip_to_hostname = new_self_ip_to_hostname

    # Map hosts_to_start
    new_self_host_to_start  = []
    for ip in self.hosts_to_start:
        new_self_host_to_start.append(mapping_ips[ip])
    self.hosts_to_start = new_self_host_to_start

    def apply_mapping(d: dict, mapping: dict) -> dict:
        """
        Apply a mapping to a dictionary.
        - Keys are remapped with mapping if present.
        - Values:
            * If iterable (set/list/tuple), each element is remapped.
            * If string (or non-iterable), attempt direct remap.
        """
        out = defaultdict(set)
        for k, vals in d.items():
            nk = mapping.get(k, k)

            if isinstance(vals, str) or not isinstance(vals, Iterable):
                # treat as a single atomic value
                nv = {mapping.get(vals, vals)}
            else:
                nv = {mapping.get(v, v) for v in vals}

            out[nk].update(nv)

        return dict(out)

    # start_position per role
    for role, start_position in self._starting_positions_per_role.items():
        # {'role': {'controlled_hosts': [...], 'known_hosts': [...], 'known_data': {...}, 'known_services': {...}, known_networks: [...], known_blocks: [...]}}
        new_start_position = {}
        new_start_position['known_networks'] = [mapping_nets.get(net, net) for net in start_position['known_networks']]
        new_start_position['controlled_hosts'] = [mapping_ips.get(ip, ip) for ip in start_position['controlled_hosts']]
        new_start_position['known_hosts'] = [mapping_ips.get(ip, ip) for ip in start_position['known_hosts']]
        new_start_position['known_services'] = {mapping_ips.get(ip, ip): services for ip, services in start_position['known_services'].items()}
        new_start_position["known_data"] = {mapping_ips.get(ip, ip): data for ip, data in start_position['known_data'].items()}
        # known_blocks {IP:set(IP)}
        new_start_position["known_blocks"] = apply_mapping(start_position.get("known_blocks", {}), mapping_ips)
        self._starting_positions_per_role[role] = new_start_position
        self.logger.debug(f"Updated starting position for role {role}: {self._starting_positions_per_role[role]}")

    # win_conditions_per_role 
    for role, win_condition in self._win_conditions_per_role.items():
        new_win_condition = {}
        new_win_condition['known_networks'] = [mapping_nets.get(net, net) for net in win_condition['known_networks']]
        new_win_condition['controlled_hosts'] = [mapping_ips.get(ip, ip) for ip in win_condition['controlled_hosts']]
        new_win_condition['known_hosts'] = [mapping_ips.get(ip, ip) for ip in win_condition['known_hosts']]
        new_win_condition['known_services'] = {mapping_ips.get(ip, ip): services for ip, services in win_condition['known_services'].items()}
        new_win_condition["known_data"] = {mapping_ips.get(ip, ip): data for ip, data in win_condition['known_data'].items()}
        new_win_condition["known_blocks"] = apply_mapping(win_condition.get("known_blocks", {}), mapping_ips)
        self._win_conditions_per_role[role] = new_win_condition
        self.logger.debug(f"Updated win condition for role {role}: {self._win_conditions_per_role[role]}")

    # goal_description_per_role
    def replace_ips_in_text(text: str, ip_mapping: dict, net_mapping:dict) -> str:
        """
        Replace IPs/CIDRs in text according to mapping {IP: IP}.
        """
        # regex: matches IPv4 like 1.2.3.4 or 1.2.3.4/24
        ip_pattern = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}(?:/\d{1,2})?\b")

        def replacer(match):
            token = match.group(0)
            if "/" in token:
                try:
                    net_obj = Network(*token.split("/"))
                    return str(net_mapping.get(net_obj, token))
                except ValueError:
                    return token
            else:
                try:
                    net_obj = IP(token)
                    return str(ip_mapping.get(net_obj, token))
                except ValueError:
                    return token

        return ip_pattern.sub(replacer, text)

    new_goal_description = {role:replace_ips_in_text(description, mapping_ips, mapping_nets) for role, description in self._goal_description_per_role.items()}
    self._goal_description_per_role = new_goal_description
    self.logger.debug(f"Updated goal description per role: {self._goal_description_per_role}")

    # update mappings stored in the environment
    for net, mapping in self._network_mapping.items():
        self._network_mapping[net] = mapping_nets[mapping]
    self.logger.debug(f"self._network_mapping: {self._network_mapping}")
    for ip, mapping in self._ip_mapping.items():
        self._ip_mapping[ip] = mapping_ips[mapping]
    self.logger.debug(f"self._ip_mapping: {self._ip_mapping}")

_execute_action

Executes the given action and updates the game state.

Parameters:

Name Type Description Default
current_state GameState

The current game state.

required
action Action

The action to execute.

required
agent_id Tuple[str, int]

identifier of the agent requesting the action.

required

Returns:

Name Type Description
GameState GameState

The new game state after execution.

Source code in netsecgame/game/worlds/NetSecGame.py
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
def _execute_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState:
    """
    Executes the given action and updates the game state.

    Args:
        current_state (GameState): The current game state.
        action (Action): The action to execute.
        agent_id (Tuple[str, int]): identifier of the agent requesting the action.

    Returns:
        GameState: The new game state after execution.
    """
    next_state = None
    match action.type:
        case ActionType.ScanNetwork:
            next_state = self._execute_scan_network_action(current_state, action, agent_id)
        case ActionType.FindServices:   
            next_state = self._execute_find_services_action(current_state, action, agent_id)
        case ActionType.FindData:
            next_state = self._execute_find_data_action(current_state, action, agent_id)
        case ActionType.ExploitService:
            next_state = self._execute_exploit_service_action(current_state, action, agent_id)
        case ActionType.ExfiltrateData:
            next_state = self._execute_exfiltrate_data_action(current_state, action, agent_id)
        case ActionType.BlockIP:
            next_state = self._execute_block_ip_action(current_state, action, agent_id)
        case _:
            raise ValueError(f"Unknown Action type or other error: '{action.type}'")
    return next_state

_execute_block_ip_action

Executes the BlockIP action in the environment.

Parameters:

Name Type Description Default
current_state GameState

The current game state.

required
action Action

The BlockIP action to execute.

required
agent_id Tuple[str, int]

Identifier of the requesting agent.

required

Returns:

Name Type Description
GameState GameState

The updated game state.

Source code in netsecgame/game/worlds/NetSecGame.py
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
def _execute_block_ip_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState:
    """
    Executes the BlockIP action in the environment.

    Args:
        current_state (GameState): The current game state.
        action (Action): The BlockIP action to execute.
        agent_id (Tuple[str, int]): Identifier of the requesting agent.

    Returns:
        GameState: The updated game state.
    """
    next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current_state)
    # Is the src in the controlled hosts?
    if "source_host" in action.parameters.keys() and action.parameters["source_host"] in current_state.controlled_hosts:
        # Is the target in the controlled hosts?
        if "target_host" in action.parameters.keys() and action.parameters["target_host"] in current_state.controlled_hosts:
            # For now there is only one FW in the main router, but this should change in the future. 
            # This means we ignore the 'target_host' that would be the router where this is applied.
            if self._firewall_check(action.parameters["source_host"], action.parameters["target_host"]):
                if action.parameters["target_host"] != action.parameters['blocked_host']:
                    self.logger.info(f"\t\tBlockConnection {action.parameters['target_host']} <-> {action.parameters['blocked_host']}")
                    # record which agent is adding the blocking rule
                    if (action.parameters["target_host"], action.parameters["blocked_host"]) not in self._agent_fw_rules:
                        self._agent_fw_rules[(action.parameters["target_host"], action.parameters["blocked_host"])] = set()
                    self._agent_fw_rules[(action.parameters["target_host"], action.parameters["blocked_host"])].add(agent_id)
                    # both directions are blocked
                    if (action.parameters["blocked_host"], action.parameters["target_host"]) not in self._agent_fw_rules:
                        self._agent_fw_rules[(action.parameters["blocked_host"], action.parameters["target_host"])] = set()
                    self._agent_fw_rules[(action.parameters["blocked_host"], action.parameters["target_host"])].add(agent_id)
                    try:
                        #remove connection target_host -> blocked_host
                        self._firewall[action.parameters["target_host"]].discard(action.parameters["blocked_host"])
                        self.logger.debug(f"\t\t\t Removed rule:'{action.parameters['target_host']}' -> {action.parameters['blocked_host']}")
                    except KeyError:
                        pass
                    try:
                        #remove blocked_host -> target_host
                        self._firewall[action.parameters["blocked_host"]].discard(action.parameters["target_host"])
                        self.logger.debug(f"\t\t\t Removed rule:'{action.parameters['blocked_host']}' -> {action.parameters['target_host']}")
                    except KeyError:
                        pass

                    #Update the FW_Rules visible to agents
                    if action.parameters["target_host"] not in  self._fw_blocks.keys():
                        self._fw_blocks[action.parameters["target_host"]] = set()
                    self._fw_blocks[action.parameters["target_host"]].add(action.parameters["blocked_host"])
                    if action.parameters["blocked_host"] not in  self._fw_blocks.keys():
                        self._fw_blocks[action.parameters["blocked_host"]] = set()
                    self._fw_blocks[action.parameters["blocked_host"]].add(action.parameters["target_host"])

                    # update the state
                    if action.parameters["target_host"] not in next_blocked.keys():
                        next_blocked[action.parameters["target_host"]] = set()
                    if action.parameters["blocked_host"] not in next_blocked.keys():
                        next_blocked[action.parameters["blocked_host"]] = set()
                    next_blocked[action.parameters["target_host"]].add(action.parameters["blocked_host"])           
                    next_blocked[action.parameters["blocked_host"]].add(action.parameters["target_host"])
                else:
                    self.logger.debug(f"\t\t\t Cant block connection form :'{action.parameters['target_host']}' to '{action.parameters['blocked_host']}'")
                # update logs
                self.update_log_file(next_data,action, action.parameters['target_host'])
            else:
                self._record_false_positive(action.parameters["source_host"], action.parameters["target_host"], agent_id)
                self.logger.debug(f"\t\t\t Connection from '{action.parameters['source_host']}->'{action.parameters['target_host']} is blocked blocked by FW")
        else:
            self.logger.debug(f"\t\t\t Invalid target_host:'{action.parameters['target_host']}'")
    else:
        self.logger.debug(f"\t\t\t Invalid source_host:'{action.parameters['source_host']}'")
    return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked)

_execute_exfiltrate_data_action

Executes the ExfiltrateData action in the environment.

Parameters:

Name Type Description Default
current_state GameState

The current game state.

required
action Action

The ExfiltrateData action to execute.

required
agent_id Tuple[str, int]

Identifier of the requesting agent.

required

Returns:

Name Type Description
GameState GameState

The updated game state.

Source code in netsecgame/game/worlds/NetSecGame.py
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
def _execute_exfiltrate_data_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState:
    """
    Executes the ExfiltrateData action in the environment.

    Args:
        current_state (GameState): The current game state.
        action (Action): The ExfiltrateData action to execute.
        agent_id (Tuple[str, int]): Identifier of the requesting agent.

    Returns:
        GameState: The updated game state.
    """
    next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current_state)
    self.logger.info(f"\t\tAttempting to Exfiltrate {action.parameters['data']} from {action.parameters['source_host']} to {action.parameters['target_host']}")
    # Is the target host controlled?
    if action.parameters["target_host"] in current_state.controlled_hosts:
        self.logger.debug(f"\t\t\t {action.parameters['target_host']} is under-control: {current_state.controlled_hosts}")
        # Is the source host controlled?
        if action.parameters["source_host"] in current_state.controlled_hosts:
            self.logger.debug(f"\t\t\t {action.parameters['source_host']} is under-control: {current_state.controlled_hosts}")
            # Is the source host in the list of hosts we know data from? (this is to avoid the keyerror later in the if)
            # Does the current state for THIS source already know about this data?
            if self._firewall_check(action.parameters["source_host"], action.parameters['target_host']):
                if action.parameters['source_host'] in current_state.known_data.keys() and action.parameters["data"] in current_state.known_data[action.parameters["source_host"]]:
                    # update logs
                    self.update_log_file(next_data,action, action.parameters['target_host'])
                    # Does the source host have any data?
                    if self._ip_to_hostname[action.parameters["source_host"]] in self._data.keys():
                        # Does the source host have this data?
                        if action.parameters["data"] in self._data[self._ip_to_hostname[action.parameters["source_host"]]]:
                            self.logger.debug("\t\t\t Data present in the source_host")
                            if action.parameters["target_host"] not in next_data.keys():
                                next_data[action.parameters["target_host"]] = {action.parameters["data"]}
                            else:
                                next_data[action.parameters["target_host"]].add(action.parameters["data"])
                            # If the data was exfiltrated to a new host, remember the data in the new nost in the env
                            if self._ip_to_hostname[action.parameters["target_host"]] not in self._data.keys():
                                self._data[self._ip_to_hostname[action.parameters["target_host"]]] = {action.parameters["data"]}
                            else:
                                self._data[self._ip_to_hostname[action.parameters["target_host"]]].add(action.parameters["data"])
                        else:
                            self.logger.debug("\t\t\tCan not exfiltrate. Source host does not have this data.")
                    else:
                        self.logger.debug("\t\t\tCan not exfiltrate. Source host does not have any data.")
                else:
                    self.logger.debug("\t\t\tCan not exfiltrate. Agent did not find this data yet.")
            else:
                self._record_false_positive(action.parameters["source_host"], action.parameters["target_host"], agent_id)
                self.logger.debug(f"\t\t\tConnection {action.parameters['source_host']} -> {action.parameters['target_host']} blocked by FW. Skipping")
        else:
            self.logger.debug("\t\t\tCan not exfiltrate. Source host is not controlled.")
    else:
        self.logger.debug("\t\t\tCan not exfiltrate. Target host is not controlled.")
    return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked)

_execute_exploit_service_action

Executes the ExploitService action in the environment.

Parameters:

Name Type Description Default
current_state GameState

The current game state.

required
action Action

The ExploitService action to execute.

required
agent_id Tuple[str, int]

Identifier of the requesting agent.

required

Returns:

Name Type Description
GameState GameState

The updated game state.

Source code in netsecgame/game/worlds/NetSecGame.py
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
def _execute_exploit_service_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState:
    """
    Executes the ExploitService action in the environment.

    Args:
        current_state (GameState): The current game state.
        action (Action): The ExploitService action to execute.
        agent_id (Tuple[str, int]): Identifier of the requesting agent.

    Returns:
        GameState: The updated game state.
    """
    next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current_state)
    # We don't check if the target is a known_host because it can be a blind attempt to attack
    self.logger.info(f"\t\tAttempting to ExploitService in '{action.parameters['target_host']}':'{action.parameters['target_service']}'")
    if "source_host" in action.parameters.keys() and action.parameters["source_host"] in current_state.controlled_hosts:
        if action.parameters["target_host"] in self._ip_to_hostname: #is it existing IP?
            if self._firewall_check(action.parameters["source_host"], action.parameters['target_host']):
                if self._ip_to_hostname[action.parameters["target_host"]] in self._services: #does it have any services?
                    if action.parameters["target_service"] in self._services[self._ip_to_hostname[action.parameters["target_host"]]]: #does it have the service in question?
                        if action.parameters["target_host"] in next_services: #does the agent know about any services this host have?
                            if action.parameters["target_service"] in next_services[action.parameters["target_host"]]:
                                self.logger.debug("\t\t\tValid service")
                                if action.parameters["target_host"] not in next_controlled_h:
                                    next_controlled_h.add(action.parameters["target_host"])
                                    self.logger.debug("\t\tAdding to controlled_hosts")
                                new_networks = self._get_networks_from_host(action.parameters["target_host"])
                                self.logger.debug(f"\t\t\tFound {len(new_networks)}: {new_networks}")
                                next_nets = next_nets.union(new_networks)
                            else:
                                self.logger.debug("\t\t\tCan not exploit. Agent does not know about target host selected service")
                        else:
                            self.logger.debug("\t\t\tCan not exploit. Agent does not know about target host having any service")
                    else:
                        self.logger.debug("\t\t\tCan not exploit. Target host does not the service that was attempted.")
                else:
                    self.logger.debug("\t\t\tCan not exploit. Target host does not have any services.")
                # update logs
                self.update_log_file(next_data,action, action.parameters['target_host'])
            else:
                self._record_false_positive(action.parameters["source_host"], action.parameters["target_host"], agent_id)
                self.logger.debug(f"\t\t\tConnection {action.parameters['source_host']} -> {action.parameters['target_host']} blocked by FW. Skipping")
        else:
            self.logger.debug("\t\t\tCan not exploit. Target host does not exist.")
    else:
        self.logger.debug(f"\t\t\t Invalid source_host:'{action.parameters['source_host']}'")
    return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked)

_execute_find_data_action

Executes the FindData action in the environment.

Parameters:

Name Type Description Default
current GameState

The current game state.

required
action Action

The FindData action to execute.

required
agent_id Tuple[str, int]

Identifier of the requesting agent.

required

Returns:

Name Type Description
GameState GameState

The updated game state.

Source code in netsecgame/game/worlds/NetSecGame.py
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
def _execute_find_data_action(self, current: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState:
    """
    Executes the FindData action in the environment.

    Args:
        current (GameState): The current game state.
        action (Action): The FindData action to execute.
        agent_id (Tuple[str, int]): Identifier of the requesting agent.

    Returns:
        GameState: The updated game state.
    """
    next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current)
    self.logger.debug(f"\t\tSearching for data in {action.parameters['target_host']}")
    if "source_host" in action.parameters.keys() and action.parameters["source_host"] in current.controlled_hosts:
        if self._firewall_check(action.parameters["source_host"], action.parameters['target_host']):
            # update logs before getting the data so this action is listed there
            self.update_log_file(next_data,action, action.parameters['target_host'])
            new_data = self._get_data_in_host(action.parameters["target_host"], current.controlled_hosts)
            self.logger.debug(f"\t\t\t Found {len(new_data)}: {new_data}")
            if len(new_data) > 0:
                if action.parameters["target_host"] not in next_data.keys():
                    next_data[action.parameters["target_host"]] = new_data
                else:
                    next_data[action.parameters["target_host"]] = next_data[action.parameters["target_host"]].union(new_data)
            # ADD KNOWN FW BLOCKS
            new_blocks = self._get_known_blocks_in_host(action.parameters["target_host"], current.controlled_hosts)
            if len(new_blocks) > 0:
                if action.parameters["target_host"] not in next_blocked.keys():
                    next_blocked[action.parameters["target_host"]] = new_blocks
                else:
                    next_blocked[action.parameters["target_host"]] = next_blocked[action.parameters["target_host"]].union(new_blocks)
        else:
            self._record_false_positive(action.parameters["source_host"], action.parameters["target_host"], agent_id)
            self.logger.debug(f"\t\t\tConnection {action.parameters['source_host']} -> {action.parameters['target_host']} blocked by FW. Skipping")
    else:
        self.logger.debug(f"\t\t\t Invalid source_host:'{action.parameters['source_host']}'")
    return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked)

_execute_find_services_action

Executes the FindServices action in the environment.

Parameters:

Name Type Description Default
current_state GameState

The current game state.

required
action Action

The FindServices action to execute.

required
agent_id Tuple[str, int]

Identifier of the requesting agent.

required

Returns:

Name Type Description
GameState GameState

The updated game state.

Source code in netsecgame/game/worlds/NetSecGame.py
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
def _execute_find_services_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState:
    """
    Executes the FindServices action in the environment.

    Args:
        current_state (GameState): The current game state.
        action (Action): The FindServices action to execute.
        agent_id (Tuple[str, int]): Identifier of the requesting agent.

    Returns:
        GameState: The updated game state.
    """
    next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current_state)
    self.logger.debug(f"\t\tSearching for services in {action.parameters['target_host']}")
    if "source_host" in action.parameters.keys() and action.parameters["source_host"] in current_state.controlled_hosts:
        if self._firewall_check(action.parameters["source_host"], action.parameters['target_host']):
            found_services = self._get_services_from_host(action.parameters["target_host"], current_state.controlled_hosts)
            self.logger.debug(f"\t\t\tFound {len(found_services)}: {found_services}")
            if len(found_services) > 0:
                next_services[action.parameters["target_host"]] = found_services

                #if host was not known, add it to the known_hosts ONLY if there are some found services
                if action.parameters["target_host"] not in next_known_h:
                    self.logger.debug(f"\t\tAdding {action.parameters['target_host']} to known_hosts")
                    next_known_h.add(action.parameters["target_host"])
                    next_nets = next_nets.union({net for net, values in self._networks.items() if action.parameters["target_host"] in values})
            # update logs
            self.update_log_file(next_data,action, action.parameters['target_host'])
        else:
            self._record_false_positive(action.parameters["source_host"], action.parameters["target_host"], agent_id)
            self.logger.debug(f"\t\t\tConnection {action.parameters['source_host']} -> {action.parameters['target_host']} blocked by FW. Skipping")
    else:
        self.logger.debug(f"\t\t\t Invalid source_host:'{action.parameters['source_host']}'")
    return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked)

_execute_scan_network_action

Executes the ScanNetwork action in the environment.

Parameters:

Name Type Description Default
current_state GameState

The current game state.

required
action Action

The ScanNetwork action to execute.

required
agent_id Tuple[str, int]

Identifier of the requesting agent.

required

Returns:

Name Type Description
GameState GameState

The updated game state.

Source code in netsecgame/game/worlds/NetSecGame.py
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
def _execute_scan_network_action(self, current_state: GameState, action: Action, agent_id: Tuple[str, int]) -> GameState:
    """
    Executes the ScanNetwork action in the environment.

    Args:
        current_state (GameState): The current game state.
        action (Action): The ScanNetwork action to execute.
        agent_id (Tuple[str, int]): Identifier of the requesting agent.

    Returns:
        GameState: The updated game state.
    """
    next_nets, next_known_h, next_controlled_h, next_services, next_data, next_blocked = state_parts_deep_copy(current_state)
    self.logger.debug(f"\t\tScanning {action.parameters['target_network']}")
    if "source_host" in action.parameters.keys() and action.parameters["source_host"] in current_state.controlled_hosts:
        new_ips = set()
        for ip in self._ip_to_hostname.keys(): #check if IP exists
            self.logger.debug(f"\t\tChecking if {ip} in {action.parameters['target_network']}")
            if str(ip) in netaddr.IPNetwork(str(action.parameters["target_network"])):
                if self._firewall_check(action.parameters["source_host"], ip):
                    self.logger.debug(f"\t\t\tAdding {ip} to new_ips")
                    new_ips.add(ip)
                    self.update_log_file(next_data,action, ip)
                else:
                    self._record_false_positive(action.parameters["source_host"], ip, agent_id)
                    self.logger.debug(f"\t\t\tConnection {action.parameters['source_host']} -> {ip} blocked by FW. Skipping")
        next_known_h = next_known_h.union(new_ips)
    else:
        self.logger.debug(f"\t\t\t Invalid source_host:'{action.parameters['source_host']}'")
    return GameState(next_controlled_h, next_known_h, next_services, next_data, next_nets, next_blocked)

_firewall_check

Checks if the firewall allows a connection from source to destination.

Parameters:

Name Type Description Default
src_ip IP

Source host IP.

required
dst_ip IP

Destination host IP.

required

Returns:

Name Type Description
bool bool

True if connection is allowed, False otherwise.

Source code in netsecgame/game/worlds/NetSecGame.py
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
def _firewall_check(self, src_ip: IP, dst_ip: IP) -> bool:
    """
    Checks if the firewall allows a connection from source to destination.

    Args:
        src_ip (IP): Source host IP.
        dst_ip (IP): Destination host IP.

    Returns:
        bool: True if connection is allowed, False otherwise.
    """
    try:
        connection_allowed = dst_ip in self._firewall[src_ip]
    except KeyError:
        connection_allowed = False
    return connection_allowed

_get_all_local_ips

Returns all private IP addresses present in the environment.

Returns:

Type Description
Set[IP]

Set[IP]: A set of private IPs.

Source code in netsecgame/game/worlds/NetSecGame.py
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
def _get_all_local_ips(self) -> Set[IP]:
    """
    Returns all private IP addresses present in the environment.

    Returns:
        Set[IP]: A set of private IPs.
    """
    local_ips = set()
    for net, ips in self._networks.items():
        if netaddr.IPNetwork(str(net)).ip.is_private():
            for ip in ips:
                local_ips.add(self._ip_mapping[ip])
    self.logger.info(f"\t\t\tLocal ips: {local_ips}")
    return local_ips

_get_data_content

Returns the content of data identified by a host IP and data ID.

Parameters:

Name Type Description Default
host_ip IP

The IP of the host.

required
data_id str

The identifier of the data.

required

Returns:

Type Description
Optional[str]

Optional[str]: The content string if found, else None.

Source code in netsecgame/game/worlds/NetSecGame.py
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
def _get_data_content(self, host_ip: IP, data_id: str) -> Optional[str]:
    """
    Returns the content of data identified by a host IP and data ID.

    Args:
        host_ip (IP): The IP of the host.
        data_id (str): The identifier of the data.

    Returns:
        Optional[str]: The content string if found, else None.
    """
    content = None
    if host_ip in self._ip_to_hostname: #is it existing IP?
        hostname = self._ip_to_hostname[host_ip]
        if (hostname, data_id) in self._data_content:
            content = self._data_content[hostname,data_id]
        else:
            self.logger.debug(f"\tData '{data_id}' not found in host '{hostname}'({host_ip})")
    else:
        self.logger.debug("Data content not found because target IP does not exists.")
    return content

_get_data_from_view

Parses view and translates all keywords. Produces dict of known data {IP: set(Data)}

Parameters:

Name Type Description Default
view_known_data Dict[IP, Iterable]

The view containing known data information.

required
keyword_scope str

Scope of keywords like 'random' or 'all'. Defaults to "host".

'host'
exclude_types List[str]

List of data types to exclude. Defaults to ["log"].

['log']

Returns:

Type Description
Dict[IP, Set[Data]]

Dict[IP, Set[Data]]: A dictionary mapping IP addresses to sets of known data.

Source code in netsecgame/game/worlds/NetSecGame.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def _get_data_from_view(self, view_known_data:dict, keyword_scope:str="host", exclude_types=["log"])->Dict[IP, Set[Data]]:
    """
    Parses view and translates all keywords. Produces dict of known data {IP: set(Data)}

    Args:
        view_known_data (Dict[IP, Iterable]): The view containing known data information.
        keyword_scope (str): Scope of keywords like 'random' or 'all'. Defaults to "host".
        exclude_types (List[str]): List of data types to exclude. Defaults to ["log"].

    Returns:
        Dict[IP, Set[Data]]: A dictionary mapping IP addresses to sets of known data.
    """
    known_data = {}
    for ip, data_list in view_known_data.items():
        self.logger.debug(f'\tParsing data from {ip}: {data_list}')
        known_data[ip] = set()
        for datum in data_list:
            if isinstance(datum, Data):
                known_data[ip].add(datum)
                self.logger.debug(f'\tAdding {datum}.')
            elif isinstance(datum, str):
                # select candidates that are not explicitly listed
                data_candidates = set()
                if keyword_scope == "host": # scope of the keyword is the host only
                    for d in self._data[self._ip_to_hostname[ip]]:
                        if d.type not in exclude_types and d not in known_data[ip]:
                            data_candidates.add(d)
                else:
                    # scope of the keyword is all hosts
                    for datapoints in self._data.values():
                        for d in datapoints:
                            if d.type not in exclude_types and d not in known_data[ip]:
                                data_candidates.add(d)
                if datum == "random": # randomly select the service
                    self.logger.info("\tSelecting data randomly")
                    if len(data_candidates) == 0:
                        self.logger.warning("\t\tNo available data. Skipping")
                    else:
                        # randomly select from candidates
                        selected = random.choice(list(data_candidates))
                        self.logger.debug(f"\t\tAdding: {selected}")
                        known_data[ip].add(selected)
                elif datum == "all":
                    self.logger.info(f"\tSelecting all data in {ip}")
                    known_data[ip].update(data_candidates)
                else:
                    self.logger.error(f"Unsupported value encountered in view_known_data: {datum}")
            else:
                self.logger.error(f"Unsupported value encountered in view_known_data: {datum}")
    # re-map all IPs based on current mapping in self._ip_mapping
    return known_data

_get_data_in_host

Returns a set of data objects found on a given host if it is controlled.

Parameters:

Name Type Description Default
host_ip IP

The IP of the host.

required
controlled_hosts Set[IP]

Set of controlled host IPs.

required

Returns:

Type Description
Set[Data]

Set[Data]: Set of data objects found.

Source code in netsecgame/game/worlds/NetSecGame.py
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
def _get_data_in_host(self, host_ip: IP, controlled_hosts: Set[IP]) -> Set[Data]:
    """
    Returns a set of data objects found on a given host if it is controlled.

    Args:
        host_ip (IP): The IP of the host.
        controlled_hosts (Set[IP]): Set of controlled host IPs.

    Returns:
        Set[Data]: Set of data objects found.
    """
    data = set()
    if host_ip in controlled_hosts: #only return data if the agent controls the host
        if host_ip in self._ip_to_hostname:
            if self._ip_to_hostname[host_ip] in self._data:
                data = self._data[self._ip_to_hostname[host_ip]]
    else:
        self.logger.debug("\t\t\tCan't get data in host. The host is not controlled.")
    return data

_get_hosts_from_view

Parses view and translates all keywords. Produces set of controlled host (IP) Args: view_hosts (Iterable): The view containing host information. allowed_hosts (Optional[List[IP]]): A list of hosts to start from if 'random' is specified.

Returns:

Type Description
Set[IP]

Set[IP]: A set of controlled hosts.

Source code in netsecgame/game/worlds/NetSecGame.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def _get_hosts_from_view(self, view_hosts:Iterable, allowed_hosts=None)->Set[IP]:
    """
    Parses view and translates all keywords. Produces set of controlled host (IP)
    Args:
        view_hosts (Iterable): The view containing host information.
        allowed_hosts (Optional[List[IP]]): A list of hosts to start from if 'random' is specified.

    Returns:
        Set[IP]: A set of controlled hosts.
    """
    hosts = set()
    self.logger.debug(f'\tParsing hosts from view: {view_hosts}')
    # controlled_hosts
    for host in view_hosts:
        if isinstance(host, IP):
            hosts.add(host)
            self.logger.debug(f'\tAdding {host}.')
        elif host == 'random':
            # Random start
            if allowed_hosts is not None:
                self.logger.debug(f'\tChoosing randomly from {allowed_hosts}')
                selected = random.choice(allowed_hosts)
            else:
                self.logger.debug(f'\tChoosing randomly from all available hosts {list(self._ip_to_hostname.keys())}')
                selected = random.choice(list(self._ip_to_hostname.keys()))
            hosts.add(selected)
            self.logger.debug(f'\t\tAdding {selected}.')
        elif host == "all_local":
            # all local ips
            self.logger.debug(f'\tAdding all local hosts')
            hosts = hosts.union(self._get_all_local_ips())
        else:
            self.logger.error(f"Unsupported value encountered in view_hosts: {host}")
    return hosts

_get_known_blocks_in_host

Returns a set of known firewall blocks from a host if it is controlled.

Parameters:

Name Type Description Default
host_ip IP

The IP of the host.

required
controlled_hosts Set[IP]

Set of controlled host IPs.

required

Returns:

Type Description
Set[IP]

Set[IP]: Set of blocked IPs.

Source code in netsecgame/game/worlds/NetSecGame.py
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
def _get_known_blocks_in_host(self, host_ip: IP, controlled_hosts: Set[IP]) -> Set[IP]:
    """
    Returns a set of known firewall blocks from a host if it is controlled.

    Args:
        host_ip (IP): The IP of the host.
        controlled_hosts (Set[IP]): Set of controlled host IPs.

    Returns:
        Set[IP]: Set of blocked IPs.
    """
    known_blocks = set()
    if host_ip in controlled_hosts: #only return data if the agent controls the host
        if host_ip in self._ip_to_hostname:
            if host_ip in self._fw_blocks:
                known_blocks = self._fw_blocks[host_ip]
    else:
        self.logger.debug("\t\t\tCan't get data in host. The host is not controlled.")
    return known_blocks

_get_networks_from_host

Returns the set of networks the host is part of.

Parameters:

Name Type Description Default
host_ip IP

The IP of the host.

required

Returns:

Type Description
Set[Network]

Set[Network]: Set of networks.

Source code in netsecgame/game/worlds/NetSecGame.py
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
def _get_networks_from_host(self, host_ip: IP) -> Set[Network]:
    """
    Returns the set of networks the host is part of.

    Args:
        host_ip (IP): The IP of the host.

    Returns:
        Set[Network]: Set of networks.
    """
    networks = set()
    for net, values in self._networks.items():
        if host_ip in values:
            networks.add(net)
    return networks

_get_networks_from_view

Parses view and translates all keywords. Produces set of known networks (Network). Args: view_known_networks (Iterable): The view containing known networks information.

Returns:

Type Description
Set[Network]

Set[Network]: A set of known networks.

Source code in netsecgame/game/worlds/NetSecGame.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def _get_networks_from_view(self, view_known_networks:Iterable)->Set[Network]:
    """
    Parses view and translates all keywords. Produces set of known networks (Network).
    Args:
        view_known_networks (Iterable): The view containing known networks information.

    Returns:
        Set[Network]: A set of known networks.
    """
    known_networks = set()
    for net in view_known_networks:
        if isinstance(net, Network):
            known_networks.add(self._network_mapping[net])
            self.logger.debug(f'\tAdding network {self._network_mapping[net]}.')
        elif net == 'random':
            # Randomly select a network from the available ones
            selected = random.choice(list(self._networks.keys()))
            known_networks.add(self._network_mapping[selected])
            self.logger.debug(f'\tAdding randomly selected network: {self._network_mapping[selected]}.')
        elif net == "all_local":
            # all local networks
            self.logger.debug('\t\tAdding all local private networks')
            for n in self._networks.keys():
                if not n.is_private():
                    known_networks.add(self._network_mapping[n])
        else:
            self.logger.error(f"Unsupported value encountered in start_position['known_networks']: {net}")
    return known_networks

_get_services_from_host

Returns a set of services found on a given host.

Parameters:

Name Type Description Default
host_ip IP

The IP of the host.

required
controlled_hosts Set[IP]

Set of controlled host IPs.

required

Returns:

Type Description
Set[Service]

Set[Service]: Set of services found on the host.

Source code in netsecgame/game/worlds/NetSecGame.py
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
def _get_services_from_host(self, host_ip: IP, controlled_hosts: Set[IP]) -> Set[Service]:
    """
    Returns a set of services found on a given host.

    Args:
        host_ip (IP): The IP of the host.
        controlled_hosts (Set[IP]): Set of controlled host IPs.

    Returns:
        Set[Service]: Set of services found on the host.
    """
    found_services = set()
    if host_ip in self._ip_to_hostname: #is it existing IP?
        if self._ip_to_hostname[host_ip] in self._services: #does it have any services?
            if host_ip in controlled_hosts: # Should  local services be included ?
                found_services = {s for s in self._services[self._ip_to_hostname[host_ip]]}
            else:
                found_services = {s for s in self._services[self._ip_to_hostname[host_ip]] if not s.is_local}
        else:
            self.logger.debug("\tServices not found because host does have any service.")
    else:
        self.logger.debug("\tServices not found because target IP does not exists.")
    return found_services

_get_services_from_view

Parses view and translates all keywords. Produces dict of known services {IP: set(Service)}

Parameters:

Name Type Description Default
view_known_services Dict[IP, Iterable]

The view containing known services information.

required

Returns:

Type Description
Dict[IP, Set[Service]]

Dict[IP, Set[Service]]: A dictionary mapping IP addresses to sets of known services.

Source code in netsecgame/game/worlds/NetSecGame.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def _get_services_from_view(self, view_known_services:dict)->Dict[IP, Set[Service]]:
    """
    Parses view and translates all keywords. Produces dict of known services {IP: set(Service)}

    Args:
        view_known_services (Dict[IP, Iterable]): The view containing known services information.

    Returns:
        Dict[IP, Set[Service]]: A dictionary mapping IP addresses to sets of known services.
    """
    # TODO: Add keyword scope parameter (like in _get_data_from_view)
    known_services = {}
    for ip, service_list in view_known_services.items():
        self.logger.debug(f'\tParsing services from {ip}: {service_list}')
        known_services[ip] = set()
        for service in service_list:
            if isinstance(service, Service):
                known_services[ip].add(service)
                self.logger.debug(f'\tAdding {service}.')
            elif isinstance(service, str):
                if service == "random": # randomly select the service
                    self.logger.info(f"\tSelecting service randomly in {ip}")
                    # select candidates that are not explicitly listed
                    service_candidates = [s for s in self._services[self._ip_to_hostname[ip]] if s not in known_services[ip]]
                    if len(service_candidates) == 0:
                        self.logger.warning("\t\tNo available services. Skipping")
                    else:
                        # randomly select from candidates
                        selected = random.choice(service_candidates)
                        self.logger.debug(f"\t\tAdding: {selected}")
                        known_services[ip].add(selected)
            elif service == "all":
                self.logger.info(f"\tSelecting all services in {ip}")
                known_services[ip].update(self._services[self._ip_to_hostname[ip]])
            else:
                self.logger.error(f"Unsupported value encountered in view_known_services: {service}")
    # re-map all IPs based on current mapping in self._ip_mapping
    return known_services

_initialize

Initializes the NetSecGame environment.

Loads the CYST configuration, sets up dynamic IP and network address generation if enabled, and stores original copies of environment data structures for later resets. Also seeds the random number generator for reproducibility and logs the completion of initialization.

Returns:

Type Description
None

None

Source code in netsecgame/game/worlds/NetSecGame.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def _initialize(self)->None:
    """
    Initializes the NetSecGame environment.

    Loads the CYST configuration, sets up dynamic IP and network address generation if enabled,
    and stores original copies of environment data structures for later resets. Also seeds the
    random number generator for reproducibility and logs the completion of initialization.

    Returns:
        None
    """
    # Load CYST configuration
    if self._cyst_objects is not None:
        self._process_cyst_config(self._cyst_objects)
                # Check if dynamic network and ip addresses are required
        if self._use_dynamic_addresses:
            self.logger.info("Dynamic change of the IP and network addresses enabled")
            self._faker_object = Faker()
            Faker.seed(self._seed)  
        # store initial values for parts which are modified during the game
        self._data_original = copy.deepcopy(self._data)
        self._data_content_original = copy.deepcopy(self._data_content)
        self._firewall_original = copy.deepcopy(self._firewall)
        self.logger.info("Environment initialization finished")
    else:
        self.logger.error("CYST configuration not loaded, cannot initialize the environment!")

_process_cyst_config

Processes the CYST configuration objects to set up the environment.

Parameters:

Name Type Description Default
configuration_objects List[Any]

List of configuration objects from CYST.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/worlds/NetSecGame.py
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def _process_cyst_config(self, configuration_objects: List[Any]) -> None:
    """
    Processes the CYST configuration objects to set up the environment.

    Args:
        configuration_objects (List[Any]): List of configuration objects from CYST.

    Returns:
        None
    """
    nodes = []
    node_to_id = {}
    routers = []
    connections = []
    exploits = []
    node_objects = {}
    fw_rules = []
    #sort objects into categories (nodes and routers MUST be processed before connections!)
    for o in configuration_objects:
        if isinstance(o, NodeConfig):
            nodes.append(o)
        elif isinstance(o, RouterConfig):
            routers.append(o)
        elif isinstance(o, ConnectionConfig):
            connections.append(o)
        elif isinstance(o, ExploitConfig):
            exploits.append(o)

    def process_node_config(node_obj: NodeConfig) -> None:
        """
        Processes a single NodeConfig object.

        Args:
            node_obj (NodeConfig): The node configuration to process.
        """
        self.logger.info(f"\tProcessing config of node '{node_obj.id}'")
        #save the complete object
        node_objects[node_obj.id] = node_obj
        self.logger.info(f'\t\tAdded {node_obj.id} to the list of available nodes.')
        node_to_id[node_obj.id] = len(node_to_id)

        #examine interfaces
        self.logger.info(f"\t\tProcessing interfaces in node '{node_obj.id}'")
        for interface in node_obj.interfaces:
            net_ip, net_mask = str(interface.net).split("/")
            net = Network(net_ip,int(net_mask))
            ip = IP(str(interface.ip))
            if len(node_obj.active_services)>0:
                self.logger.info(f"\tAdding as potential start point")
                self.hosts_to_start.append(ip)
            self._ip_to_hostname[ip] = node_obj.id
            if net not in self._networks:
                self._networks[net] = []
            self._networks[net].append(ip)
            self.logger.info(f'\t\tAdded network {str(interface.net)} to the list of available nets, with node {node_obj.id}.')


        #services
        self.logger.info(f"\t\tProcessing services & data in node '{node_obj.id}'")
        for service in node_obj.passive_services:
            # Check if it is a candidate for random start
            # Becareful, it will add all the IPs for this node
            if service.name == "can_attack_start_here":
                self.hosts_to_start.append(IP(str(interface.ip)))
                continue

            if node_obj.id not in self._services:
                self._services[node_obj.id] = []
            self._services[node_obj.id].append(Service(service.name, "passive", service.version, service.local))
            #data
            self.logger.info(f"\t\t\tProcessing data in node '{node_obj.id}':'{service.name}' service")
            try:
                for data in service.private_data:
                    self.logger.info(f"\t\t\t\tData: {data}")
                    if node_obj.id not in self._data:
                        self._data[node_obj.id] = set()
                    datapoint = Data(data.owner, data.description)
                    self._data[node_obj.id].add(datapoint)
                    # add content
                    self._data_content[node_obj.id, datapoint.id] = f"Content of {datapoint.id}"
            except AttributeError:
                # Service does not contain any data
                pass

    def process_router_config(router_obj: RouterConfig) -> Optional[bool]:
        """
        Processes a single RouterConfig object.

        Args:
            router_obj (RouterConfig): The router configuration to process.

        Returns:
            Optional[bool]: False if the router should be skipped, None otherwise.
        """
        self.logger.info(f"\tProcessing config of router '{router_obj.id}'")
        # Process a router
        # Add the router to the list of nodes. This goes
        # against CYST definition. Check if we can modify it in CYST
        if router_obj.id.lower() == 'internet':
            # Ignore the router called 'internet' because it is not a router
            # in our network
            self.logger.info("\t\tSkipping the internet router")
            return False

        node_objects[router_obj.id] = router_obj
        node_to_id[router_obj.id] = len(node_to_id)
        self.logger.info(f"\t\tProcessing interfaces in router '{router_obj.id}'")
        for interface in r.interfaces:
            net_ip, net_mask = str(interface.net).split("/")
            net = Network(net_ip,int(net_mask))
            ip = IP(str(interface.ip))
            self._ip_to_hostname[ip] = router_obj.id
            if net not in self._networks:
                self._networks[net] = []
            self._networks[net].append(ip)

        #add Firewall rules
        self.logger.info(f"\t\tReading FW rules in router '{router_obj.id}'")
        for tp in router_obj.traffic_processors:
            for chain in tp.chains:
                for rule in chain.rules:
                    fw_rules.append(rule)

    def process_firewall() -> Dict[IP, Set[IP]]:
        """
        Processes firewall rules and generates the connectivity mapping.

        Returns:
            Dict[IP, Set[IP]]: Mapping of IP to sets of allowed destination IPs.
        """
        # process firewall rules
        all_ips = set()
        for ips in self._networks.values():
            all_ips.update(ips)
        firewall = {ip:set() for ip in all_ips}
        if self.config_manager.get_use_firewall():
            self.logger.info("Firewall enabled - processing FW rules")
            # LOCAL NETWORKS
            for net, ips in self._networks.items():
                # IF net is local, allow connection between all nodes in it
                if netaddr.IPNetwork(str(net)).ip.is_private():
                    for src in ips:
                        for dst in ips:
                            firewall[src].add(dst)

            # LOCAL TO INTERNET
            for net, ips in self._networks.items():
                # IF net is local, allow connection between all nodes in it
                if netaddr.IPNetwork(str(net)).ip.is_private():
                    for public_net, public_ips in self._networks.items():
                        if not netaddr.IPNetwork(str(public_net)).ip.is_private():
                            for src in ips:
                                for dst in public_ips:
                                    firewall[src].add(dst)
                                    #add self loop:
                                    firewall[dst].add(dst)
            # FW RULES FROM CONFIG
            for rule in fw_rules:
                if rule.policy == FirewallPolicy.ALLOW:
                    src_net = netaddr.IPNetwork(rule.src_net)
                    dst_net = netaddr.IPNetwork(rule.dst_net)
                    self.logger.info(f"\t{rule}")
                    for src_ip in all_ips:
                        if str(src_ip) in src_net:
                            for dst_ip in all_ips:
                                if str(dst_ip) in dst_net:
                                    firewall[src_ip].add(dst_ip)
                                    self.logger.info(f"\t\tAdding {src_ip} -> {dst_ip}")
        else:
            self.logger.info("Firewall disabled, allowing all connections")
            for src_ip in all_ips:
                for dst_ip in all_ips:
                    firewall[src_ip].add(dst_ip)
        return firewall

    #process Nodes
    for n in nodes:
        process_node_config(n)
    #process routers
    for r in routers:
        process_router_config(r)

    # process firewall rules
    self._firewall = process_firewall()

    self.logger.info("\tProcessing available exploits")

    #exploits
    self._exploits = exploits

    # create logfile in each nodes
    for node in nodes + routers:
        if node.id not in self._data:
            self._data[node.id] = set()
        self.logger.info(f"\tAdding logfile to node {node.id}")
        self._data[node.id].add(Data(owner="system", id="logfile", type="log", size=0))
    #create initial mapping
    self.logger.info("\tCreating initial mapping of IPs and Networks")
    for net in self._networks.keys():
        self._network_mapping[net] = net
    self.logger.info(f"\tintitial self._network_mapping: {self._network_mapping}")
    for ip in self._ip_to_hostname.keys():
        self._ip_mapping[ip] = ip
    self.logger.info(f"\tintitial self._ip_mapping: {self._ip_mapping}")
    self.logger.info("CYST configuration processed successfully")

_record_false_positive

Records a false positive if a connection block affects a benign agent.

Parameters:

Name Type Description Default
src_ip IP

Source host IP.

required
dst_ip IP

Destination host IP.

required
agent_id Tuple[str, int]

Identifier of the author agent.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/worlds/NetSecGame.py
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
def _record_false_positive(self, src_ip: IP, dst_ip: IP, agent_id: Tuple[str, int]) -> None:
    """
    Records a false positive if a connection block affects a benign agent.

    Args:
        src_ip (IP): Source host IP.
        dst_ip (IP): Destination host IP.
        agent_id (Tuple[str, int]): Identifier of the author agent.

    Returns:
        None
    """
    # only record false positive if the agent is benign
    if self.is_agent_benign(agent_id):
        # find agent(s) that created the rule
        src_host = src_ip
        dst_host = dst_ip
        if (src_host, dst_host) in self._agent_fw_rules:
            # check if this connection is actively blocked
            for author_agent in self._agent_fw_rules[(src_host, dst_host)]:
                self.logger.info(f"Adding false positive for blocking {src_host} -> {dst_host} by {author_agent}")
                if author_agent not in self._agent_false_positives:
                    self._agent_false_positives[author_agent] = 0
                self._agent_false_positives[author_agent] += 1
        else:
            self.logger.debug(f"False positive for blocking {src_host} -> {dst_host} caused by the system configuration.")

_set_random_seed

Sets the random seed for the environment.

Parameters:

Name Type Description Default
seed int

The random seed to set.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/worlds/NetSecGame.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def _set_random_seed(self, seed)->None:
    """
    Sets the random seed for the environment.

    Args:
        seed (int): The random seed to set.

    Returns:
        None
    """
    self._seed = seed
    if seed is not None:
        np.random.seed(seed)
        random.seed(seed)
        # if faker is used, seed it too
        if hasattr(self, '_faker_object'):
            Faker.seed(seed)
        self.logger.info(f'Setting env seed to {seed}')
    else:
        self.logger.warning("No seed provided, using random seed")

register_agent async

Registers an agent and creates its initial and goal states.

Parameters:

Name Type Description Default
agent_id Tuple[str, int]

Identifier of the agent.

required
agent_role AgentRole

Role assigned to the agent.

required
agent_initial_view Dict[str, Any]

View for initial state generation.

required
agent_win_condition_view Dict[str, Any]

View for goal state generation.

required

Returns:

Type Description
Tuple[GameState, GameState]

Tuple[GameState, GameState]: (initial_state, goal_state).

Source code in netsecgame/game/worlds/NetSecGame.py
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
async def register_agent(self, agent_id: Tuple[str, int], agent_role: AgentRole, agent_initial_view: Dict[str, Any], agent_win_condition_view: Dict[str, Any]) -> Tuple[GameState, GameState]:
    """
    Registers an agent and creates its initial and goal states.

    Args:
        agent_id (Tuple[str, int]): Identifier of the agent.
        agent_role (AgentRole): Role assigned to the agent.
        agent_initial_view (Dict[str, Any]): View for initial state generation.
        agent_win_condition_view (Dict[str, Any]): View for goal state generation.

    Returns:
        Tuple[GameState, GameState]: (initial_state, goal_state).
    """
    start_game_state = self._create_state_from_view(agent_initial_view)
    goal_state = self._create_goal_state_from_view(agent_win_condition_view)
    return start_game_state, goal_state

remove_agent async

Removes an agent from the game.

Parameters:

Name Type Description Default
agent_id Tuple[str, int]

Identifier of the agent.

required
agent_state GameState

Final state of the agent.

required

Returns:

Name Type Description
bool bool

Always True.

Source code in netsecgame/game/worlds/NetSecGame.py
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
async def remove_agent(self, agent_id: Tuple[str, int], agent_state: GameState) -> bool:
    """
    Removes an agent from the game.

    Args:
        agent_id (Tuple[str, int]): Identifier of the agent.
        agent_state (GameState): Final state of the agent.

    Returns:
        bool: Always True.
    """
    # No action is required
    return True

reset async

Resets the entire world to its initial state for a new episode.

Parameters:

Name Type Description Default
seed Optional[int]

New random seed if provided.

None
topology_change Optional[bool]

Whether a dynamic topology change should occur.

None

Returns:

Name Type Description
bool bool

Always True.

Source code in netsecgame/game/worlds/NetSecGame.py
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
async def reset(self, seed: Optional[int] = None, topology_change: Optional[bool] = None) -> bool:
    """
    Resets the entire world to its initial state for a new episode.

    Args:
        seed (Optional[int]): New random seed if provided.
        topology_change (Optional[bool]): Whether a dynamic topology change should occur.

    Returns:
        bool: Always True.
    """
    # write all steps in the episode replay buffer in the file
    self.logger.info('--- Reseting NSG Environment to its initial state ---')
    if seed is not None:
        self._set_random_seed(seed)

    if self.config_manager.get_use_dynamic_addresses(): #topology change is allowed
        if topology_change: # agents agree on topology change
            self._dynamic_ip_change(seed=seed)
    # reset self._data to orignal state
    self._data = copy.deepcopy(self._data_original)
    # reset self._data_content to orignal state
    self._data_content = copy.deepcopy(self._data_content_original)
    # reset all firewall related data structure
    self._firewall = copy.deepcopy(self._firewall_original)
    self._fw_blocks = {}
    self._agent_fw_rules = {}
    return True

reset_agent async

Resets an agent's state for a new episode.

Parameters:

Name Type Description Default
agent_id Tuple[str, int]

Identifier of the agent.

required
agent_role AgentRole

Role assigned to the agent.

required
agent_initial_view Dict[str, Any]

View for initial state generation.

required
agent_win_condition_view Dict[str, Any]

View for goal state generation.

required

Returns:

Type Description
Tuple[GameState, GameState]

Tuple[GameState, GameState]: (reset_state, goal_state).

Source code in netsecgame/game/worlds/NetSecGame.py
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
async def reset_agent(self, agent_id: Tuple[str, int], agent_role: AgentRole, agent_initial_view: Dict[str, Any], agent_win_condition_view: Dict[str, Any]) -> Tuple[GameState, GameState]:
    """
    Resets an agent's state for a new episode.

    Args:
        agent_id (Tuple[str, int]): Identifier of the agent.
        agent_role (AgentRole): Role assigned to the agent.
        agent_initial_view (Dict[str, Any]): View for initial state generation.
        agent_win_condition_view (Dict[str, Any]): View for goal state generation.

    Returns:
        Tuple[GameState, GameState]: (reset_state, goal_state).
    """
    game_state = self._create_state_from_view(agent_initial_view)
    goal_state = self._create_goal_state_from_view(agent_win_condition_view)
    return game_state, goal_state

step async

Processes a single game step for an agent.

Parameters:

Name Type Description Default
agent_id Tuple[str, int]

Identifier of the agent.

required
agent_state GameState

Current state of the agent.

required
action Action

Action to perform.

required

Returns:

Name Type Description
GameState GameState

The resulting game state.

Source code in netsecgame/game/worlds/NetSecGame.py
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
async def step(self, agent_id: Tuple[str, int], agent_state: GameState, action: Action) -> GameState:
    """
    Processes a single game step for an agent.

    Args:
        agent_id (Tuple[str, int]): Identifier of the agent.
        agent_state (GameState): Current state of the agent.
        action (Action): Action to perform.

    Returns:
        GameState: The resulting game state.
    """
    return self._execute_action(agent_state, action, agent_id)

update_log_file

Updates the log file on the target host with the provided action details.

Parameters:

Name Type Description Default
known_data Dict[IP, Set[Data]]

Current known data mappings.

required
action Action

The action to record in the log.

required
target_host IP

The IP of the host where the log is updated.

required

Returns:

Type Description
None

None

Source code in netsecgame/game/worlds/NetSecGame.py
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
def update_log_file(self, known_data: Dict[IP, Set[Data]], action: Action, target_host: IP) -> None:
    """
    Updates the log file on the target host with the provided action details.

    Args:
        known_data (Dict[IP, Set[Data]]): Current known data mappings.
        action (Action): The action to record in the log.
        target_host (IP): The IP of the host where the log is updated.

    Returns:
        None
    """
    hostaname = self._ip_to_hostname[target_host]
    self.logger.debug(f"Updating log file in host {hostaname}")
    try:
        current_log_file = list(filter(lambda x: x.owner == "system" and x.type == "log",self._data[hostaname]))[0]
        self._data[hostaname].discard(current_log_file)
        if current_log_file.size == 0:
            content = []
        else: 
            content = json.loads(current_log_file.content)
        content.append({'source_host': str(action.parameters["source_host"]), 'action_type': str(action.type)})
        new_content = json.dumps(content)
    except KeyError:
        self.logger.debug(f"\t\t\tLog not found in host {hostaname}. Creating new one.")
        new_content = [{'source_host': str(action.parameters["source_host"]), 'action_type': str(action.type)}]
        new_content = json.dumps(new_content)
    self._data[hostaname].add(Data(owner="system", id="logfile", type="log", size=len(new_content) , content= new_content))