Skip to content

General Utilities

This module provides various utility functions used throughout the NetSecGame framework.

netsecgame.utils.utils

generate_valid_actions

Function that generates a list of all valid actions in a given GameState Args: state (GameState): The current game state. include_blocks (bool): Whether to include BlockIP actions. Defaults to False.

Returns:

Name Type Description
set Set[Action]

A set of valid Action objects.

Source code in netsecgame/utils/utils.py
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
def generate_valid_actions(state: GameState, include_blocks=False)->Set[Action]:
    """Function that generates a list of all valid actions in a given GameState
    Args:
        state (GameState): The current game state.
        include_blocks (bool): Whether to include BlockIP actions. Defaults to False.

    Returns:
        set: A set of valid Action objects.    
    """
    valid_actions = set()
    def is_fw_blocked(state, src_ip, dst_ip)->bool:
        blocked = False
        try:
            blocked = dst_ip in state.known_blocks[src_ip]
        except KeyError:
            pass #this src ip has no known blocks
        return blocked 

    for source_host in state.controlled_hosts:
        #Network Scans
        for network in state.known_networks:
            # TODO ADD neighbouring networks
            valid_actions.add(Action(ActionType.ScanNetwork, parameters={"target_network": network, "source_host": source_host,}))

        # Service Scans
        for blocked_host in state.known_hosts:
            if not is_fw_blocked(state, source_host, blocked_host):
                valid_actions.add(Action(ActionType.FindServices, parameters={"target_host": blocked_host, "source_host": source_host,}))

        # Service Exploits
        for blocked_host, service_list in state.known_services.items():
            if not is_fw_blocked(state, source_host,blocked_host):
                for service in service_list:
                    valid_actions.add(Action(ActionType.ExploitService, parameters={"target_host": blocked_host,"target_service": service,"source_host": source_host,}))
        # Data Scans
        for blocked_host in state.controlled_hosts:
            if not is_fw_blocked(state, source_host,blocked_host):
                valid_actions.add(Action(ActionType.FindData, parameters={"target_host": blocked_host, "source_host": blocked_host}))

        # Data Exfiltration
        for source_host, data_list in state.known_data.items():
            for data in data_list:
                for trg_host in state.controlled_hosts:
                    if trg_host != source_host:
                        if not is_fw_blocked(state, source_host,trg_host):
                            valid_actions.add(Action(ActionType.ExfiltrateData, parameters={"target_host": trg_host, "source_host": source_host, "data": data}))

        # BlockIP
        if include_blocks:
            for source_host in state.controlled_hosts:
                for target_host in state.controlled_hosts:
                    if not is_fw_blocked(state, source_host,target_host):
                        for blocked_ip in state.known_hosts:
                            valid_actions.add(Action(ActionType.BlockIP, {"target_host":target_host, "source_host":source_host, "blocked_host":blocked_ip}))
    return valid_actions  

get_file_hash

Computes hash of a given file.

Parameters:

Name Type Description Default
filepath str

The path to the file to hash.

required
hash_func str

The hash function to use (default is 'sha256').

'sha256'
chunk_size int

The size of each chunk to read from the file (default is 4096 bytes).

4096

Returns:

Name Type Description
str str

The hexadecimal hash of the file.

Source code in netsecgame/utils/utils.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def get_file_hash(filepath: str, hash_func: str = 'sha256', chunk_size: int = 4096) -> str:
    """
    Computes hash of a given file.

    Args:
        filepath (str): The path to the file to hash.
        hash_func (str): The hash function to use (default is 'sha256').
        chunk_size (int): The size of each chunk to read from the file (default is 4096 bytes).

    Returns:
        str: The hexadecimal hash of the file.
    """
    hash_algorithm = hashlib.new(hash_func)
    with open(filepath, 'rb') as file:
        chunk = file.read(chunk_size)
        while chunk:
            hash_algorithm.update(chunk)
            chunk = file.read(chunk_size)
    return hash_algorithm.hexdigest()

get_logging_level

Configures the logging level based on the provided debug_level string.

Parameters:

Name Type Description Default
debug_level str

The level name (e.g., 'DEBUG', 'INFO').

required

Returns:

Name Type Description
int int

The corresponding logging level constant.

Source code in netsecgame/utils/utils.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def get_logging_level(debug_level: str) -> int:
    """
    Configures the logging level based on the provided debug_level string.

    Args:
        debug_level (str): The level name (e.g., 'DEBUG', 'INFO').

    Returns:
        int: The corresponding logging level constant.
    """
    log_levels = {
        "DEBUG": logging.DEBUG,
        "INFO": logging.INFO,
        "WARNING": logging.WARNING,
        "ERROR": logging.ERROR,
        "CRITICAL": logging.CRITICAL
    }

    level = log_levels.get(debug_level.upper(), logging.ERROR)
    return level

get_str_hash

Computes hash of a given string.

Parameters:

Name Type Description Default
string str

The input string to hash.

required
hash_func str

The hash function to use (default is 'sha256').

'sha256'

Returns:

Name Type Description
str str

The hexadecimal hash of the input string.

Source code in netsecgame/utils/utils.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def get_str_hash(string: str, hash_func: str = 'sha256') -> str:
    """
    Computes hash of a given string.

    Args:
        string (str): The input string to hash.
        hash_func (str): The hash function to use (default is 'sha256').

    Returns:
        str: The hexadecimal hash of the input string.
    """
    hash_algorithm = hashlib.new(hash_func)
    hash_algorithm.update(string.encode('utf-8'))
    return hash_algorithm.hexdigest()

observation_as_dict

Generates a dictionary representation of a given Observation object.

Parameters:

Name Type Description Default
observation Observation

The observation object to convert.

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: The dictionary representation of the observation.

Source code in netsecgame/utils/utils.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def observation_as_dict(observation: Observation) -> Dict[str, Any]:
    """
    Generates a dictionary representation of a given Observation object.

    Args:
        observation (Observation): The observation object to convert.

    Returns:
        Dict[str, Any]: The dictionary representation of the observation.
    """
    return {
        'state': observation.state.as_dict,
        'reward': observation.reward,
        'end': observation.end,
        # Using dict() ensures safety if info is a namedtuple or other mapping
        'info': dict(observation.info) 
    }

observation_from_dict

Reconstructs an Observation object from a dictionary representation.

Parameters:

Name Type Description Default
data Dict[str, Any]

The dictionary containing observation data.

required

Returns:

Name Type Description
Observation Observation

The reconstructed Observation namedtuple.

Source code in netsecgame/utils/utils.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def observation_from_dict(data: Dict[str, Any]) -> Observation:
    """
    Reconstructs an Observation object from a dictionary representation.

    Args:
        data (Dict[str, Any]): The dictionary containing observation data.

    Returns:
        Observation: The reconstructed Observation namedtuple.
    """
    try:
        # Since we refactored serialization, 'state' is now a dictionary
        state_data = data.get("state")

        # Robustness check: Ensure we have a dict before converting
        if isinstance(state_data, dict):
            state = GameState.from_dict(state_data)
        else:
            raise ValueError(f"Expected dictionary for 'state', got {type(state_data)}")

        return Observation(
            state=state,
            reward=float(data.get("reward", 0.0)),
            end=bool(data.get("end", False)),
            info=data.get("info", {})
        )
    except Exception as e:
        logging.getLogger(__name__).error(f"Error in creating Observation from dict: {e}")
        raise e

observation_from_str

Reconstructs an Observation object from a JSON string representation.

Parameters:

Name Type Description Default
json_str str

The JSON string representation of the observation.

required

Returns:

Name Type Description
Observation Observation

The reconstructed Observation namedtuple.

Source code in netsecgame/utils/utils.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def observation_from_str(json_str: str) -> Observation:
    """
    Reconstructs an Observation object from a JSON string representation.

    Args:
        json_str (str): The JSON string representation of the observation.

    Returns:
        Observation: The reconstructed Observation namedtuple.
    """
    try:
        # 1. Parse the main JSON string -> returns a dict
        data = json.loads(json_str)

        # 2. Pass that dict to our existing from_dict method
        # This keeps the logic DRY (Don't Repeat Yourself)
        return observation_from_dict(data)

    except Exception as e:
        logging.getLogger(__name__).error(f"Error in creating Observation from string: {e}")
        raise e

observation_to_str

Generates a JSON string representation of a given Observation object.

Parameters:

Name Type Description Default
observation Observation

The observation object to convert.

required

Returns:

Name Type Description
str str

The JSON string representation.

Source code in netsecgame/utils/utils.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def observation_to_str(observation: Observation) -> str:
    """
    Generates a JSON string representation of a given Observation object.

    Args:
        observation (Observation): The observation object to convert.

    Returns:
        str: The JSON string representation.
    """
    try:
        # Clean JSON structure: {"state": {...}, "reward": 0, ...}
        # No more escaped JSON strings inside the JSON.
        return json.dumps(observation_as_dict(observation))
    except Exception as e:
        logging.getLogger(__name__).error(f"Error in encoding observation '{observation}' to JSON string: {e}")
        raise e

parse_log_content

Parses a JSON string of log content into a list of log entries.

Parameters:

Name Type Description Default
log_content str

The raw JSON log content.

required

Returns:

Type Description
Optional[List[Dict[str, Any]]]

Optional[List[Dict[str, Any]]]: A list of log entries if successful, None otherwise.

Source code in netsecgame/utils/utils.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def parse_log_content(log_content: str) -> Optional[List[Dict[str, Any]]]:
    """
    Parses a JSON string of log content into a list of log entries.

    Args:
        log_content (str): The raw JSON log content.

    Returns:
        Optional[List[Dict[str, Any]]]: A list of log entries if successful, None otherwise.
    """
    try:
        logs = []
        data = json.loads(log_content)
        for item in data:
            ip = IP(item["source_host"])
            action_type = ActionType.from_string(item["action_type"])
            logs.append({"source_host":ip, "action_type":action_type})
        return logs
    except json.JSONDecodeError as e:
        logging.getLogger(__name__).error(f"Error decoding JSON: {e}")
        return None
    except TypeError as e:
        logging.getLogger(__name__).error(f"Error decoding JSON: {e}")
        return None

read_replay_buffer_from_csv

Reads steps from a CSV file and restores objects for the replay buffer.

Parameters:

Name Type Description Default
csvfile str

Path to the CSV file.

required

Returns:

Type Description
List[Tuple[GameState, Action, float, GameState, bool]]

List[Tuple[GameState, Action, float, GameState, bool]]: The restored replay buffer.

Source code in netsecgame/utils/utils.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def read_replay_buffer_from_csv(csvfile: str) -> List[Tuple[GameState, Action, float, GameState, bool]]:
    """
    Reads steps from a CSV file and restores objects for the replay buffer.

    Args:
        csvfile (str): Path to the CSV file.

    Returns:
        List[Tuple[GameState, Action, float, GameState, bool]]: The restored replay buffer.
    """
    raise DeprecationWarning("This function is deprecated and will be removed in future versions.")
    buffer = []
    try:
        with open(csvfile, 'r') as f_object:
            csv_reader = csv.reader(f_object, delimiter=';')
            for [s_t, a_t, r, s_t1 , done] in csv_reader:
                buffer.append((GameState.from_json(s_t), Action.from_json(a_t), r, GameState.from_json(s_t1), done))
    except FileNotFoundError:
        # There was no buffer
        pass
    return buffer

read_trajectories_from_jsonl

Reads trajectories from a JSONL file.

Parameters:

Name Type Description Default
filepath str

Path to the JSONL file.

required

Returns:

Type Description
List[Any]

List[Any]: A list of trajectories read from the file.

Source code in netsecgame/utils/utils.py
283
284
285
286
287
288
289
290
291
292
293
def read_trajectories_from_jsonl(filepath: str) -> List[Any]:
    """
    Reads trajectories from a JSONL file.

    Args:
        filepath (str): Path to the JSONL file.

    Returns:
        List[Any]: A list of trajectories read from the file.
    """
    raise NotImplementedError("This function is not yet implemented.")

state_as_ordered_string

Converts a GameState into a deterministic ordered string representation.

Parameters:

Name Type Description Default
state GameState

The game state to convert.

required

Returns:

Name Type Description
str str

The ordered string representation of the state.

Source code in netsecgame/utils/utils.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def state_as_ordered_string(state: GameState) -> str:
    """
    Converts a GameState into a deterministic ordered string representation.

    Args:
        state (GameState): The game state to convert.

    Returns:
        str: The ordered string representation of the state.
    """
    ret = ""
    ret += f"nets:[{','.join([str(x) for x in sorted(state.known_networks)])}],"
    ret += f"hosts:[{','.join([str(x) for x in sorted(state.known_hosts)])}],"
    ret += f"controlled:[{','.join([str(x) for x in sorted(state.controlled_hosts)])}],"
    ret += "services:{"
    for host in sorted(state.known_services.keys()):
        ret += f"{host}:[{','.join([str(x) for x in sorted(state.known_services[host])])}]"
    ret += "},data:{"
    for host in sorted(state.known_data.keys()):
        ret += f"{host}:[{','.join([str(x) for x in sorted(state.known_data[host])])}]"
    ret += "}, blocks:{"
    for host in sorted(state.known_blocks.keys()):
        ret += f"{host}:[{','.join([str(x) for x in sorted(state.known_blocks[host])])}]"
    ret += "}"
    return ret

store_replay_buffer_in_csv

Stores steps from a replay buffer into a CSV file.

Parameters:

Name Type Description Default
replay_buffer List[Tuple[GameState, Action, float, GameState, bool]]

The buffer items to store.

required
filename str

The name of the output file.

required
delimiter str

The delimiter to use in the CSV (default is ';').

';'

Returns:

Type Description
None

None

Source code in netsecgame/utils/utils.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def store_replay_buffer_in_csv(replay_buffer: List[Tuple[GameState, Action, float, GameState, bool]], filename: str, delimiter: str = ";") -> None:
    """
    Stores steps from a replay buffer into a CSV file.

    Args:
        replay_buffer (List[Tuple[GameState, Action, float, GameState, bool]]): The buffer items to store.
        filename (str): The name of the output file.
        delimiter (str): The delimiter to use in the CSV (default is ';').

    Returns:
        None
    """
    raise DeprecationWarning("This function is deprecated and will be removed in future versions.")
    with open(filename, 'a') as f_object:
        writer_object = csv.writer(f_object, delimiter=delimiter)
        for (s_t, a_t, r, s_t1, done) in replay_buffer:
            writer_object.writerow([s_t.as_json(), a_t.as_json(), r, s_t1.as_json(), done])

store_trajectories_to_jsonl

Stores trajectories to a JSONL file.

Parameters:

Name Type Description Default
trajectories Any

The trajectory data to store (usually a dict or list).

required
dir str

Directory where the file will be stored.

required
filename str

Name of the file (without extension).

required

Returns:

Type Description
None

None

Source code in netsecgame/utils/utils.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def store_trajectories_to_jsonl(trajectories: Any, dir: str, filename: str) -> None:
    """
    Stores trajectories to a JSONL file.

    Args:
        trajectories (Any): The trajectory data to store (usually a dict or list).
        dir (str): Directory where the file will be stored.
        filename (str): Name of the file (without extension).

    Returns:
        None
    """
    # make sure the directory exists
    if not os.path.exists(dir):
        os.makedirs(dir)
    # construct the full file name
    filename = os.path.join(dir, f"{filename.rstrip('jsonl')}.jsonl")
    # store the trajectories
    with jsonlines.open(filename, "a") as writer:
        writer.write(trajectories)