Module nari.io.reader.actlog
Classes and functions for handling network logs from ACT
Expand source code
"""Classes and functions for handling network logs from ACT"""
from typing import Optional
from nari.io.reader import Reader
from nari.io.reader.actlogutils import ID_MAPPINGS, ActEventType, ActLogChecksumType, date_from_act_timestamp, validate_checksum
from nari.types.event import Event
from nari.types.event.version import SemanticVersion, Version
from nari.util.exceptions import EventNotFound
from nari.io.reader.actlogutils.exceptions import InvalidActChecksum
# The last version of ACT to use MD5 checksums
# We default to SHA256, but if the version is <= 2.2.1.6 we switch to MD5
LAST_MD5_VERSION = SemanticVersion(2,2,1,6)
class ActLogReader(Reader):
"""Implementation of the Reader class for parsing ACT network logs"""
def __init__(self, actlog_path, *, raise_on_checksum_failure=False, raise_on_invalid_id=False):
# might need a helper function, but eh.
self.handle = open(actlog_path, 'r', encoding='utf-8') # pylint: disable=consider-using-with
self.index = 1
self.raise_on_checksum_failure = raise_on_checksum_failure
self.raise_on_invalid_id = raise_on_invalid_id
self.algo = ActLogChecksumType.SHA256
def __del__(self):
"""Handles closing the file when the object undergoes garbage collection"""
self.handle.close()
def handle_line(self, line: str) -> Optional[Event]:
"""Handles an ACT-specific line"""
args = line.strip().split('|')
id_ = int(args[0])
if self.raise_on_invalid_id and id_ not in ID_MAPPINGS:
raise EventNotFound(f"ACT id: {id_}")
datestr = args[1]
timestamp = date_from_act_timestamp(datestr)
if self.raise_on_checksum_failure:
if id_ in (ActEventType.memoryzonechange, ActEventType.version):
self.index = 1
if id_ == ActEventType.version:
version_event = ID_MAPPINGS[id_](timestamp, args[2:-1])
if isinstance(version_event, Version) and version_event.version <= LAST_MD5_VERSION:
self.algo = ActLogChecksumType.MD5
if validate_checksum(line.strip(), self.index, self.algo) is False:
raise InvalidActChecksum(f'Invalid checksum for line {line.strip()} with algo {self.algo} and index {self.index})')
self.index += 1
event: Optional[Event] = None
if id_ in ID_MAPPINGS:
event = ID_MAPPINGS[id_](timestamp, args[2:-1])
else:
event = Event(timestamp)
return event
def read_next(self) -> Optional[Event]:
"""Returns an array of all the ACT log events from the file"""
# keep reading until we get a non-None response from handle_line
while True:
line: str = self.handle.readline()
if len(line) == 0:
return None
if (event := self.handle_line(line)) is not None:
return event
Classes
class ActLogReader (actlog_path, *, raise_on_checksum_failure=False, raise_on_invalid_id=False)
-
Implementation of the Reader class for parsing ACT network logs
Expand source code
class ActLogReader(Reader): """Implementation of the Reader class for parsing ACT network logs""" def __init__(self, actlog_path, *, raise_on_checksum_failure=False, raise_on_invalid_id=False): # might need a helper function, but eh. self.handle = open(actlog_path, 'r', encoding='utf-8') # pylint: disable=consider-using-with self.index = 1 self.raise_on_checksum_failure = raise_on_checksum_failure self.raise_on_invalid_id = raise_on_invalid_id self.algo = ActLogChecksumType.SHA256 def __del__(self): """Handles closing the file when the object undergoes garbage collection""" self.handle.close() def handle_line(self, line: str) -> Optional[Event]: """Handles an ACT-specific line""" args = line.strip().split('|') id_ = int(args[0]) if self.raise_on_invalid_id and id_ not in ID_MAPPINGS: raise EventNotFound(f"ACT id: {id_}") datestr = args[1] timestamp = date_from_act_timestamp(datestr) if self.raise_on_checksum_failure: if id_ in (ActEventType.memoryzonechange, ActEventType.version): self.index = 1 if id_ == ActEventType.version: version_event = ID_MAPPINGS[id_](timestamp, args[2:-1]) if isinstance(version_event, Version) and version_event.version <= LAST_MD5_VERSION: self.algo = ActLogChecksumType.MD5 if validate_checksum(line.strip(), self.index, self.algo) is False: raise InvalidActChecksum(f'Invalid checksum for line {line.strip()} with algo {self.algo} and index {self.index})') self.index += 1 event: Optional[Event] = None if id_ in ID_MAPPINGS: event = ID_MAPPINGS[id_](timestamp, args[2:-1]) else: event = Event(timestamp) return event def read_next(self) -> Optional[Event]: """Returns an array of all the ACT log events from the file""" # keep reading until we get a non-None response from handle_line while True: line: str = self.handle.readline() if len(line) == 0: return None if (event := self.handle_line(line)) is not None: return event
Ancestors
Methods
def handle_line(self, line: str) ‑> Optional[Event]
-
Handles an ACT-specific line
Expand source code
def handle_line(self, line: str) -> Optional[Event]: """Handles an ACT-specific line""" args = line.strip().split('|') id_ = int(args[0]) if self.raise_on_invalid_id and id_ not in ID_MAPPINGS: raise EventNotFound(f"ACT id: {id_}") datestr = args[1] timestamp = date_from_act_timestamp(datestr) if self.raise_on_checksum_failure: if id_ in (ActEventType.memoryzonechange, ActEventType.version): self.index = 1 if id_ == ActEventType.version: version_event = ID_MAPPINGS[id_](timestamp, args[2:-1]) if isinstance(version_event, Version) and version_event.version <= LAST_MD5_VERSION: self.algo = ActLogChecksumType.MD5 if validate_checksum(line.strip(), self.index, self.algo) is False: raise InvalidActChecksum(f'Invalid checksum for line {line.strip()} with algo {self.algo} and index {self.index})') self.index += 1 event: Optional[Event] = None if id_ in ID_MAPPINGS: event = ID_MAPPINGS[id_](timestamp, args[2:-1]) else: event = Event(timestamp) return event
def read_next(self) ‑> Optional[Event]
-
Returns an array of all the ACT log events from the file
Expand source code
def read_next(self) -> Optional[Event]: """Returns an array of all the ACT log events from the file""" # keep reading until we get a non-None response from handle_line while True: line: str = self.handle.readline() if len(line) == 0: return None if (event := self.handle_line(line)) is not None: return event
Inherited members