Configuration manager
Configuration Manager
Configuration manager is a component of the game coordinator that handles the configuration of the game. It is responsible for loading the configuration from the YAML file and providing it to the game coordinator.
netsecgame.game.configuration_manager.ConfigurationManager(task_config_file=None, service_host=None, service_port=None)
Manages the loading and access of game configuration.
Handles fetching configuration from efficient sources (local file or remote service)
and provides structured access to configuration data.
Source code in netsecgame/game/configuration_manager.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32 | def __init__(self, task_config_file: Optional[str] = None, service_host: Optional[str] = None, service_port: Optional[int] = None):
self.logger = logging.getLogger("ConfigurationManager")
self._task_config_file = task_config_file
self._service_host = service_host
self._service_port = service_port
self._parser: Optional[ConfigParser] = None
self._cyst_objects = None
self._config_file_hash: Optional[str] = None
# Cache for parsed values
self._starting_positions: Dict[str, Any] = {}
self._win_conditions: Dict[str, Any] = {}
self._goal_descriptions: Dict[str, str] = {}
self._max_steps: Dict[str, Optional[int]] = {}
|
get_all_goal_descriptions()
Returns goal descriptions for all roles.
Source code in netsecgame/game/configuration_manager.py
181
182
183
184
185
186
187
188
189
190 | def get_all_goal_descriptions(self) -> Dict[str, str]:
"""Returns goal descriptions for all roles."""
goal_descriptions = {}
for agent_role in AgentRole:
try:
goal_descriptions[agent_role] = self.get_goal_description(role=agent_role)
except KeyError:
goal_descriptions[agent_role] = ""
self.logger.info(f"Goal description for role '{agent_role}': {goal_descriptions[agent_role]}")
return goal_descriptions
|
get_all_max_steps()
Returns max steps for all roles.
Source code in netsecgame/game/configuration_manager.py
192
193
194
195
196
197
198
199 | def get_all_max_steps(self) -> Dict[str, Optional[int]]:
"""Returns max steps for all roles."""
# Using self.get_max_steps might raise RuntimeError if checks are there,
# but simpler to just call parser directly or the single accessor since we are inside the class.
# However, the single accessor has the check.
# But wait, self.get_max_steps(role) does `self._parser.get_max_steps(role)` already.
# Iterating over AgentRole is correct.
return {role: self.get_max_steps(role) for role in AgentRole}
|
get_all_starting_positions()
Returns starting positions for all roles.
Source code in netsecgame/game/configuration_manager.py
159
160
161
162
163
164
165
166
167
168 | def get_all_starting_positions(self) -> Dict[str, Any]:
"""Returns starting positions for all roles."""
starting_positions = {}
for agent_role in AgentRole:
try:
starting_positions[agent_role] = self.get_starting_position(role=agent_role)
self.logger.info(f"Starting position for role '{agent_role}': {starting_positions[agent_role]}")
except KeyError:
starting_positions[agent_role] = {}
return starting_positions
|
get_all_win_conditions()
Returns win conditions for all roles.
Source code in netsecgame/game/configuration_manager.py
170
171
172
173
174
175
176
177
178
179 | def get_all_win_conditions(self) -> Dict[str, Any]:
"""Returns win conditions for all roles."""
win_conditions = {}
for agent_role in AgentRole:
try:
win_conditions[agent_role] = self.get_win_conditions(role=agent_role)
except KeyError:
win_conditions[agent_role] = {}
self.logger.info(f"Win condition for role '{agent_role}': {win_conditions[agent_role]}")
return win_conditions
|
get_goal_description(role)
Returns the goal description for a specific role.
Source code in netsecgame/game/configuration_manager.py
| def get_goal_description(self, role: str) -> str:
"""Returns the goal description for a specific role."""
if not self._parser:
raise RuntimeError("Configuration not loaded.")
return self._parser.get_goal_description(agent_role=role)
|
get_max_steps(role)
Returns the max steps for a specific role.
Source code in netsecgame/game/configuration_manager.py
| def get_max_steps(self, role: str) -> Optional[int]:
"""Returns the max steps for a specific role."""
if not self._parser:
raise RuntimeError("Configuration not loaded.")
return self._parser.get_max_steps(role)
|
get_rewards(reward_names=['step', 'success', 'fail', 'false_positive'], default_value=0)
Returns the rewards configuration.
Source code in netsecgame/game/configuration_manager.py
| def get_rewards(self, reward_names: List[str] = ["step", "success", "fail", "false_positive"], default_value: int = 0) -> dict:
"""Returns the rewards configuration."""
if not self._parser:
raise RuntimeError("Configuration not loaded.")
return self._parser.get_rewards(reward_names, default_value)
|
get_starting_position(role)
Returns the starting position configuration for a specific role.
Source code in netsecgame/game/configuration_manager.py
| def get_starting_position(self, role: str) -> dict:
"""Returns the starting position configuration for a specific role."""
if not self._parser:
raise RuntimeError("Configuration not loaded.")
return self._parser.get_start_position(agent_role=role)
|
get_win_conditions(role)
Returns the win conditions for a specific role.
Source code in netsecgame/game/configuration_manager.py
| def get_win_conditions(self, role: str) -> dict:
"""Returns the win conditions for a specific role."""
if not self._parser:
raise RuntimeError("Configuration not loaded.")
return self._parser.get_win_conditions(agent_role=role)
|
load()
async
Determines the source and loads the configuration.
Prioritizes remote service if configured, otherwise falls back to local file.
Source code in netsecgame/game/configuration_manager.py
34
35
36
37
38
39
40
41
42
43
44
45
46 | async def load(self) -> None:
"""
Determines the source and loads the configuration.
Prioritizes remote service if configured, otherwise falls back to local file.
"""
if self._service_host and self._service_port:
self.logger.info(f"Fetching task configuration from {self._service_host}:{self._service_port}")
await self._fetch_remote_configuration()
elif self._task_config_file:
self.logger.info(f"Loading task configuration from file: {self._task_config_file}")
self._load_local_configuration()
else:
raise ValueError("Task configuration source not specified (neither file nor service)")
|
ConfigParser
ConfigParser is a class that is responsible for parsing the YAML configuration file and providing it to the game coordinator.
netsecgame.game.config_parser.ConfigParser(task_config_file=None, config_dict=None)
Class to deal with the configuration file of NetSecGame Coordinator
Args:
task_config_file (str|None): Path to the configuration file
config_dict (dict|None): Dictionary with configuration data
Initializes the configuration parser. Required either path to a confgiuration file or a dict with configuraitons.
Source code in netsecgame/game/config_parser.py
21
22
23
24
25
26
27
28
29
30
31 | def __init__(self, task_config_file:str|None=None, config_dict:dict|None=None):
"""
Initializes the configuration parser. Required either path to a confgiuration file or a dict with configuraitons.
"""
self.logger = logging.getLogger('ConfigParser')
if task_config_file:
self.read_config_file(task_config_file)
elif config_dict:
self.config = config_dict
else:
self.logger.error("You must provide either the configuration file or a dictionary with the configuration!")
|
get_goal_description(agent_role)
Get goal description per role
Source code in netsecgame/game/config_parser.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318 | def get_goal_description(self, agent_role)->str:
"""
Get goal description per role
"""
match agent_role:
case "Attacker":
try:
description = self.config['coordinator']['agents'][agent_role]["goal"]["description"]
except KeyError:
description = ""
case "Defender":
try:
description = self.config['coordinator']['agents'][agent_role]["goal"]["description"]
except KeyError:
description = ""
case "Benign":
description = ""
case _:
raise ValueError(f"Unsupported agent role: {agent_role}")
return description
|
get_max_steps(role=str)
Get the max steps based on agent's role
Source code in netsecgame/game/config_parser.py
285
286
287
288
289
290
291
292
293
294
295
296
297 | def get_max_steps(self, role=str)->Optional[int]:
"""
Get the max steps based on agent's role
"""
try:
max_steps = int(self.config['coordinator']['agents'][role]["max_steps"])
except KeyError:
max_steps = None
self.logger.warning(f"Item 'max_steps' not found in 'coordinator.agents.{role}'!. Setting value to default=None (no step limit)")
except TypeError as e:
max_steps = None
self.logger.warning(f"Unsupported value in 'coordinator.agents.{role}.max_steps': {e}. Setting value to default=None (no step limit)")
return max_steps
|
get_player_start_position(type_of_player)
Generate the starting position of an attacking agent
type_of_player: Can be 'attackers' or 'defenders'
Source code in netsecgame/game/config_parser.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247 | def get_player_start_position(self, type_of_player:str):
"""
Generate the starting position of an attacking agent
type_of_player: Can be 'attackers' or 'defenders'
"""
# Read known nets
known_networks = self.read_agents_known_networks(type_of_player, 'start_position')
# Read known hosts
known_hosts = self.read_agents_known_hosts(type_of_player, 'start_position')
# Read controlled hosts
controlled_hosts = self.read_agents_controlled_hosts(type_of_player, 'start_position')
# Start services
known_services = self.read_agents_known_services(type_of_player, 'start_position')
# Start data
known_data = self.read_agents_known_data(type_of_player, 'start_position')
player_start_position = {}
player_start_position['known_networks'] = known_networks
player_start_position['controlled_hosts'] = controlled_hosts
player_start_position['known_hosts'] = known_hosts
player_start_position['known_data'] = known_data
player_start_position['known_services'] = known_services
return player_start_position
|
get_player_win_conditions(type_of_player)
Get the goal of the player
type_of_player: Can be 'attackers' or 'defenders'
Source code in netsecgame/game/config_parser.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218 | def get_player_win_conditions(self, type_of_player:str):
"""
Get the goal of the player
type_of_player: Can be 'attackers' or 'defenders'
"""
# Read known nets
known_networks = self.read_agents_known_networks(type_of_player, 'goal')
# Read known hosts
known_hosts = self.read_agents_known_hosts(type_of_player, 'goal')
# Read controlled hosts
controlled_hosts = self.read_agents_controlled_hosts(type_of_player, 'goal')
# Goal services
known_services = self.read_agents_known_services(type_of_player, 'goal')
# Read known blocks
known_blocks = self.read_agents_known_blocks(type_of_player, 'goal')
# Goal data
known_data = self.read_agents_known_data(type_of_player, 'goal')
# Blocks
known_blocks = self.read_agents_known_blocks(type_of_player, 'goal')
player_goal = {}
player_goal['known_networks'] = known_networks
player_goal['controlled_hosts'] = controlled_hosts
player_goal['known_hosts'] = known_hosts
player_goal['known_data'] = known_data
player_goal['known_services'] = known_services
player_goal['known_blocks'] = known_blocks
return player_goal
|
get_randomize_goal_every_episode(default_value=False)
Get if the randomization should be done only once or at the beginning of every episode
Source code in netsecgame/game/config_parser.py
385
386
387
388
389
390
391
392
393
394 | def get_randomize_goal_every_episode(self, default_value: bool = False) -> bool:
"""
Get if the randomization should be done only once or at the beginning of every episode
"""
try:
randomize_goal_every_episode = self.config["coordinator"]["agents"]["attackers"]["goal"]["is_any_part_of_goal_random"]
except KeyError:
# Option is not in the configuration - default to FALSE
randomize_goal_every_episode = default_value
return randomize_goal_every_episode
|
get_rewards(reward_names, default_value=0)
Reads configuration for rewards for cases listed in 'rewards_names'
Source code in netsecgame/game/config_parser.py
320
321
322
323
324
325
326
327
328
329 | def get_rewards(self, reward_names:list, default_value=0)->dict:
"Reads configuration for rewards for cases listed in 'rewards_names'"
rewards = {}
for name in reward_names:
try:
rewards[name] = self.config['env']["rewards"][name]
except KeyError:
self.logger.warning(f"No reward value found for '{name}'. Usinng default reward({name})={default_value}")
rewards[name] = default_value
return rewards
|
get_scenario()
Get the scenario config objects based on the configuration. Only import objects that are selected via importlib.
Source code in netsecgame/game/config_parser.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374 | def get_scenario(self):
"""
Get the scenario config objects based on the configuration. Only import objects that are selected via importlib.
"""
allowed_names = {
"scenario1" : "netsecgame.game.scenarios.scenario_configuration",
"scenario1_small" : "netsecgame.game.scenarios.smaller_scenario_configuration",
"scenario1_tiny" : "netsecgame.game.scenarios.tiny_scenario_configuration",
"one_network": "netsecgame.game.scenarios.one_net",
"three_net_scenario": "netsecgame.game.scenarios.three_net_scenario",
"two_networks": "netsecgame.game.scenarios.two_nets", # same as scenario1
"two_networks_small": "netsecgame.game.scenarios.two_nets_small", # same as scenario1_small
"two_networks_tiny": "netsecgame.game.scenarios.two_nets_tiny", # same as scenario1_small
}
scenario_name = self.config['env']['scenario']
# make sure to validate the input
if scenario_name not in allowed_names:
raise ValueError(f"Unsupported scenario: {scenario_name}")
# import the correct module
module = importlib.import_module(allowed_names[scenario_name])
return module.configuration_objects
|
get_seed(whom)
Get the seeds
Source code in netsecgame/game/config_parser.py
376
377
378
379
380
381
382
383 | def get_seed(self, whom):
"""
Get the seeds
"""
seed = self.config[whom]['random_seed']
if seed == 'random':
seed = randint(0,100)
return seed
|
get_store_trajectories(default_value=False)
Read if the replay buffer should be stored in file
Source code in netsecgame/game/config_parser.py
341
342
343
344
345
346
347
348
349
350 | def get_store_trajectories(self, default_value: bool = False):
"""
Read if the replay buffer should be stored in file
"""
try:
store_trajectories = self.config['env']['save_trajectories']
except KeyError:
# Option is not in the configuration - default to FALSE
store_trajectories = default_value
return store_trajectories
|
get_use_dynamic_addresses(default_value=False)
Reads if the IP and Network addresses should be dynamically changed.
Source code in netsecgame/game/config_parser.py
331
332
333
334
335
336
337
338
339 | def get_use_dynamic_addresses(self, default_value: bool = False)->bool:
"""
Reads if the IP and Network addresses should be dynamically changed.
"""
try:
use_dynamic_addresses = self.config['env']['use_dynamic_addresses']
except KeyError:
use_dynamic_addresses = default_value
return bool(use_dynamic_addresses)
|
get_use_firewall(default_value=False)
Retrieves if the firewall functionality is allowed for netsecgame.
Default: False
Source code in netsecgame/game/config_parser.py
396
397
398
399
400
401
402
403
404
405 | def get_use_firewall(self, default_value: bool = False)->bool:
"""
Retrieves if the firewall functionality is allowed for netsecgame.
Default: False
"""
try:
use_firewall = self.config['env']['use_firewall']
except KeyError:
use_firewall = default_value
return use_firewall
|
read_agents_controlled_hosts(type_agent, type_data)
Generic function to read the controlled hosts for any agent and goal of position
Source code in netsecgame/game/config_parser.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182 | def read_agents_controlled_hosts(self, type_agent: str, type_data: str) -> set:
"""
Generic function to read the controlled hosts for any agent and goal of position
"""
controlled_hosts_conf = self.config['coordinator']['agents'][type_agent][type_data]['controlled_hosts']
controlled_hosts = set()
for ip in controlled_hosts_conf:
try:
_ = netaddr.IPAddress(ip)
controlled_hosts.add(IP(ip))
except (ValueError, netaddr.AddrFormatError) as e:
if ip == 'random' :
# A random start ip was asked for
controlled_hosts.add('random')
elif ip == 'all_local':
controlled_hosts.add('all_local')
else:
self.logger.error(f'Configuration problem with the controlled hosts: {e}')
return controlled_hosts
|
read_agents_known_blocks(type_agent, type_data)
Generic function to read the known blocks for any agent and goal of position
Source code in netsecgame/game/config_parser.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96 | def read_agents_known_blocks(self, type_agent: str, type_data: str) -> dict:
"""
Generic function to read the known blocks for any agent and goal of position
"""
known_blocks_conf = self.config["coordinator"]['agents'][type_agent][type_data]['known_blocks']
known_blocks = {}
for target_host, block_list in known_blocks_conf.items():
try:
target_host = IP(target_host)
except ValueError:
self.logger.error(f"Error when converting {target_host} to IP address object")
if isinstance(block_list,list):
known_blocks[target_host] = map(lambda x: IP(x), block_list)
elif block_list == "all_attackers":
known_blocks[target_host] = block_list
else:
raise ValueError(f"Unsupported value in 'known_blocks': {known_blocks_conf}")
return known_blocks
|
read_agents_known_data(type_agent, type_data)
Generic function to read the known data for any agent and goal of position
Source code in netsecgame/game/config_parser.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 | def read_agents_known_data(self, type_agent: str, type_data: str) -> dict:
"""
Generic function to read the known data for any agent and goal of position
"""
known_data_conf = self.config['coordinator']['agents'][type_agent][type_data]['known_data']
known_data = {}
for ip, data in known_data_conf.items():
try:
# Check the host is a good ip
_ = netaddr.IPAddress(ip)
known_data_host = IP(ip)
known_data[known_data_host] = set()
for datum in data:
if not isinstance(datum, list) and datum.lower() == "random":
known_data[known_data_host].add("random")
else:
known_data_content_str_user = datum[0]
known_data_content_str_data = datum[1]
known_data_content = Data(known_data_content_str_user, known_data_content_str_data)
known_data[known_data_host].add(known_data_content)
except (ValueError, netaddr.AddrFormatError):
known_data = {}
return known_data
|
read_agents_known_hosts(type_agent, type_data)
Generic function to read the known hosts for any agent and goal of position
Source code in netsecgame/game/config_parser.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162 | def read_agents_known_hosts(self, type_agent: str, type_data: str) -> set:
"""
Generic function to read the known hosts for any agent and goal of position
"""
known_hosts_conf = self.config['coordinator']['agents'][type_agent][type_data]['known_hosts']
known_hosts = set()
for ip in known_hosts_conf:
try:
_ = netaddr.IPAddress(ip)
known_hosts.add(IP(ip))
except (ValueError, netaddr.AddrFormatError) as e :
if ip == 'random':
# A random start ip was asked for
known_hosts.add('random')
elif ip == 'all_local':
known_hosts.add('all_local')
else:
self.logger.error(f'Configuration problem with the known hosts: {e}')
return known_hosts
|
read_agents_known_networks(type_agent, type_data)
Generic function to read the known networks for any agent and goal of position
Source code in netsecgame/game/config_parser.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 | def read_agents_known_networks(self, type_agent: str, type_data: str) -> set:
"""
Generic function to read the known networks for any agent and goal of position
"""
known_networks_conf = self.config['coordinator']['agents'][type_agent][type_data]['known_networks']
known_networks = set()
for net in known_networks_conf:
try:
if '/' in net:
_ = netaddr.IPNetwork(net)
host_part, net_part = net.split('/')
known_networks.add(Network(host_part, int(net_part)))
except (ValueError, TypeError, netaddr.AddrFormatError):
self.logger.error('Configuration problem with the known networks')
return known_networks
|
read_agents_known_services(type_agent, type_data)
Generic function to read the known services for any agent and goal of position
Source code in netsecgame/game/config_parser.py
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 | def read_agents_known_services(self, type_agent: str, type_data: str) -> dict:
"""
Generic function to read the known services for any agent and goal of position
"""
known_services_conf = self.config["coordinator"]['agents'][type_agent][type_data]['known_services']
known_services = {}
for ip, data in known_services_conf.items():
try:
# Check the host is a good ip
_ = netaddr.IPAddress(ip)
known_services_host = IP(ip)
known_services[known_services_host] = []
for service in data: # process each item in the list
if isinstance(service, list): # Service defined as list
name = service[0]
type = service[1]
version = service[2]
is_local = service[3]
known_services[known_services_host].append(Service(name, type, version, is_local))
elif isinstance(service, str): # keyword
if service.lower() == "random":
known_services[known_services_host].append("random")
else:
logging.warning(f"Unsupported values in agent known_services{ip}:{service}")
else:
logging.warning(f"Unsupported values in agent known_services{ip}:{service}")
except (ValueError, netaddr.AddrFormatError):
known_services = {}
return known_services
|
read_config_file(conf_file_name)
reads configuration file
Source code in netsecgame/game/config_parser.py
33
34
35
36
37
38
39
40
41
42 | def read_config_file(self, conf_file_name:str):
"""
reads configuration file
"""
try:
with open(conf_file_name) as source:
self.config = yaml.safe_load(source)
except (IOError, TypeError) as e:
self.logger.error(f'Error loading the configuration file{e}')
pass
|
read_env_action_data(action_name)
Generic function to read the known data for any agent and goal of position
Source code in netsecgame/game/config_parser.py
44
45
46
47
48
49
50
51
52 | def read_env_action_data(self, action_name: str) -> float:
"""
Generic function to read the known data for any agent and goal of position
"""
try:
action_success_p = self.config['env']['actions'][action_name]['prob_success']
except KeyError:
action_success_p = 1
return action_success_p
|