Module nari.io.reader.actlogutils
Just a bunch of helper methods to spit out events from the ACT log
Expand source code
"""Just a bunch of helper methods to spit out events from the ACT log"""
from datetime import datetime
from hashlib import md5, sha256
from typing import Callable, Optional
from enum import IntEnum, Enum
from nari.types.event import Event
from nari.types import Timestamp
from nari.types.event.limitbreak import LimitBreak
# here we go
from nari.io.reader.actlogutils.metadata import version_from_logline, config_from_logline
from nari.io.reader.actlogutils.zone import zonechange_from_logline
from nari.io.reader.actlogutils.status import statuslist_from_logline, statuslist3_from_logline, statusapply_from_logline
from nari.io.reader.actlogutils.limitbreak import limitbreak_from_logline
from nari.io.reader.actlogutils.ability import ability_from_logline, aoeability_from_logline
from nari.io.reader.actlogutils.tick import tick_from_logline
from nari.io.reader.actlogutils.directorupdate import director_events_from_logline
from nari.io.reader.actlogutils.updatehpmp import updatehpmp_from_logline
from nari.io.reader.actlogutils.actorspawn import actor_spawn_from_logline
from nari.io.reader.actlogutils.gauge import gauge_from_logline
from nari.io.reader.actlogutils.playerstats import playerstats_from_logline
from nari.io.reader.actlogutils.visibility import visibility_from_logline
from nari.io.reader.actlogutils.targeticon import targeticon_from_logline
from nari.io.reader.actlogutils.targetmarker import targetmarker_from_logline
from nari.io.reader.actlogutils.tether import tether_from_logline
from nari.io.reader.actlogutils.waymark import waymark_from_logline
from nari.io.reader.actlogutils.party import partylist_from_logline
from nari.io.reader.actlogutils.effectresult import effectresult_from_logline
from nari.io.reader.actlogutils.cast import startcast_from_logline, stopcast_from_logline
from nari.io.reader.actlogutils.exceptions import InvalidActChecksumAlgorithm
DEFAULT_DATE_FORMAT: str = '%Y-%m-%dT%H:%M:%S.%f%z'
ActEventFn = Callable[[Timestamp, list[str]], Optional[Event]]
# pylint: disable=invalid-name
class ActLogChecksumType(Enum):
"""List of hashsum algorithms used by different ACT versions"""
MD5 = "md5"
SHA256 = "sha256"
# pylint: enable=invalid-name
# pylint: disable=invalid-name
class ActEventType(IntEnum):
"""List of Event types from the ACT network log"""
memorychatlog = 0
memoryzonechange = 1
memorychangeprimaryplayer = 2
memoryaddcombatant = 3
memoryremovecombatant = 4
memorypartylist = 11
memoryplayerstats = 12
networkstartscasting = 20
networkability = 21
networkaoeability = 22
networkcancelability = 23
networkdothot = 24
networkdeath = 25
networkstatusadd = 26
networktargeticon = 27
networkwaymarkmarker = 28
networksignmarker = 29
networkstatusremove = 30
networkgauge = 31
unusedworld = 32
networkdirector = 33
networknametoggle = 34
networktether = 35
networklimitbreak = 36
networkeffectresult = 37
networkstatuslist = 38
networkupdatehpmp = 39
memorychangemap = 40
memorysystemlogmessage = 41
networkstatuslist3 = 42
config = 249
hook = 250
debug = 251
packetdump = 252
version = 253
error = 254
@classmethod
def has_id(cls, id_: int) -> bool:
"""Returns True if the id is in the enum"""
return id_ in cls.__members__.values()
@classmethod
def has_type(cls, name: str) -> bool:
"""Returns True if the name is in the enum"""
return name in cls.__members__.keys() # pylint: disable=consider-iterating-dictionary
# pylint: enable=invalid-name
def date_from_act_timestamp(datestr: str) -> Timestamp:
"""Parse timestamp from ACT log into a Timestamp
Look, this is dirty. This is wrong. Please someone find a better way to do this.
"""
return int(datetime.strptime(f'{datestr[:26]}{datestr[-6:]}', DEFAULT_DATE_FORMAT).timestamp() * 1000)
def validate_checksum(line: str, index: int, algo: ActLogChecksumType = ActLogChecksumType.SHA256) -> bool:
"""Validates an ACT log line
Given some line 1|foo|bar|baz|a823425f532c540667195f641dd3649b, and an index of 1, then the md5sum of
1|foo|bar|baz|1 (where 1 is the index) should be a823425f532c540667195f641dd3649b (which is the checksum value)
"""
parts = line.split('|')
check_hash = parts[-1]
to_hash = f'{"|".join(parts[:-1])}|{index}'.encode('utf-8')
match algo:
case ActLogChecksumType.MD5:
return md5(to_hash).hexdigest() == check_hash
case ActLogChecksumType.SHA256:
return sha256(to_hash).hexdigest()[:16] == check_hash
case _:
raise InvalidActChecksumAlgorithm(f'Unexpected checksum algorithm: {algo}. Expected one of MD5 and SHA256.')
# pylint: disable=unused-argument
def noop(timestamp: Timestamp, params: list[str]) -> Event:
"""Straight-up ignores things"""
# print(f'Ignoring an event with timestamp {timestamp} and params: {"|".join(params)}')
ID_MAPPINGS: dict[int, ActEventFn] = {
# Internal events
ActEventType.config: config_from_logline,
ActEventType.debug: noop,
ActEventType.error: noop,
ActEventType.hook: noop,
ActEventType.packetdump: noop,
ActEventType.version: version_from_logline,
# Memory events
ActEventType.memorychatlog: noop,
ActEventType.memoryzonechange: zonechange_from_logline,
ActEventType.memorychangemap: noop,
ActEventType.memorychangeprimaryplayer: noop,
ActEventType.memoryaddcombatant: actor_spawn_from_logline,
ActEventType.memoryremovecombatant: noop,
ActEventType.memorysystemlogmessage: noop,
ActEventType.memoryplayerstats: playerstats_from_logline,
ActEventType.memorypartylist: partylist_from_logline,
# Network events
ActEventType.networkgauge: gauge_from_logline,
ActEventType.networkdeath: noop,
ActEventType.networknametoggle: visibility_from_logline,
ActEventType.networkupdatehpmp: updatehpmp_from_logline,
ActEventType.networkdirector: director_events_from_logline,
ActEventType.networkstartscasting: startcast_from_logline,
ActEventType.networkcancelability: stopcast_from_logline,
ActEventType.networkability: ability_from_logline,
ActEventType.networkaoeability: aoeability_from_logline,
ActEventType.networkeffectresult: effectresult_from_logline,
ActEventType.networkstatuslist: statuslist_from_logline,
ActEventType.networkstatuslist3: statuslist3_from_logline,
ActEventType.networkstatusadd: statusapply_from_logline,
ActEventType.networkstatusremove: noop,
ActEventType.networkdothot: tick_from_logline,
ActEventType.networklimitbreak: limitbreak_from_logline,
ActEventType.networksignmarker: targetmarker_from_logline,
ActEventType.networktargeticon: targeticon_from_logline,
ActEventType.networkwaymarkmarker: waymark_from_logline,
ActEventType.networktether: tether_from_logline,
# Defined by ACT but unused events
ActEventType.unusedworld: noop,
}
Sub-modules
nari.io.reader.actlogutils.ability
-
Parse ability (action) data from ACT log line
nari.io.reader.actlogutils.actorspawn
-
Parse actor spawn data from ACT log line
nari.io.reader.actlogutils.cast
-
Parses cast data from ACT log line
nari.io.reader.actlogutils.death
-
Parses death data from ACT log line
nari.io.reader.actlogutils.directorupdate
-
Parse director commands from ACT log lines
nari.io.reader.actlogutils.effectresult
-
Parse effect result data from ACT log line
nari.io.reader.actlogutils.exceptions
-
Exceptions related to ACT log parsing
nari.io.reader.actlogutils.gauge
-
"Parse gauge data from ACT log line
nari.io.reader.actlogutils.limitbreak
-
Parses LB bar state data from ACT log line
nari.io.reader.actlogutils.metadata
-
Parses metadata events from ACT log line
nari.io.reader.actlogutils.party
-
Parse partylist data from ACT log line
nari.io.reader.actlogutils.playerstats
-
Parse player stats data from ACT log line
nari.io.reader.actlogutils.status
-
Parse status data from ACT log line
nari.io.reader.actlogutils.targeticon
-
Parse content-specific overhead marker data from ACT log line
nari.io.reader.actlogutils.targetmarker
-
Parse player-applied overhead marker data from ACT log line
nari.io.reader.actlogutils.tether
-
Parse tether data from ACT log line
nari.io.reader.actlogutils.tick
-
Parse effect-over-time data from ACT log line
nari.io.reader.actlogutils.updatehpmp
-
Parse HP and MP updates from ACT log line
nari.io.reader.actlogutils.visibility
-
Parse visibility data from ACT log line
nari.io.reader.actlogutils.waymark
-
Parse waymark data from ACT log line
nari.io.reader.actlogutils.zone
-
Parse zone change data from ACT log line
Functions
def date_from_act_timestamp(datestr: str) ‑> int
-
Parse timestamp from ACT log into a Timestamp Look, this is dirty. This is wrong. Please someone find a better way to do this.
Expand source code
def date_from_act_timestamp(datestr: str) -> Timestamp: """Parse timestamp from ACT log into a Timestamp Look, this is dirty. This is wrong. Please someone find a better way to do this. """ return int(datetime.strptime(f'{datestr[:26]}{datestr[-6:]}', DEFAULT_DATE_FORMAT).timestamp() * 1000)
def noop(timestamp: int, params: list[str]) ‑> Event
-
Straight-up ignores things
Expand source code
def noop(timestamp: Timestamp, params: list[str]) -> Event: """Straight-up ignores things""" # print(f'Ignoring an event with timestamp {timestamp} and params: {"|".join(params)}')
def validate_checksum(line: str, index: int, algo: ActLogChecksumType = ActLogChecksumType.SHA256) ‑> bool
-
Validates an ACT log line Given some line 1|foo|bar|baz|a823425f532c540667195f641dd3649b, and an index of 1, then the md5sum of 1|foo|bar|baz|1 (where 1 is the index) should be a823425f532c540667195f641dd3649b (which is the checksum value)
Expand source code
def validate_checksum(line: str, index: int, algo: ActLogChecksumType = ActLogChecksumType.SHA256) -> bool: """Validates an ACT log line Given some line 1|foo|bar|baz|a823425f532c540667195f641dd3649b, and an index of 1, then the md5sum of 1|foo|bar|baz|1 (where 1 is the index) should be a823425f532c540667195f641dd3649b (which is the checksum value) """ parts = line.split('|') check_hash = parts[-1] to_hash = f'{"|".join(parts[:-1])}|{index}'.encode('utf-8') match algo: case ActLogChecksumType.MD5: return md5(to_hash).hexdigest() == check_hash case ActLogChecksumType.SHA256: return sha256(to_hash).hexdigest()[:16] == check_hash case _: raise InvalidActChecksumAlgorithm(f'Unexpected checksum algorithm: {algo}. Expected one of MD5 and SHA256.')
Classes
class ActEventType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
List of Event types from the ACT network log
Expand source code
class ActEventType(IntEnum): """List of Event types from the ACT network log""" memorychatlog = 0 memoryzonechange = 1 memorychangeprimaryplayer = 2 memoryaddcombatant = 3 memoryremovecombatant = 4 memorypartylist = 11 memoryplayerstats = 12 networkstartscasting = 20 networkability = 21 networkaoeability = 22 networkcancelability = 23 networkdothot = 24 networkdeath = 25 networkstatusadd = 26 networktargeticon = 27 networkwaymarkmarker = 28 networksignmarker = 29 networkstatusremove = 30 networkgauge = 31 unusedworld = 32 networkdirector = 33 networknametoggle = 34 networktether = 35 networklimitbreak = 36 networkeffectresult = 37 networkstatuslist = 38 networkupdatehpmp = 39 memorychangemap = 40 memorysystemlogmessage = 41 networkstatuslist3 = 42 config = 249 hook = 250 debug = 251 packetdump = 252 version = 253 error = 254 @classmethod def has_id(cls, id_: int) -> bool: """Returns True if the id is in the enum""" return id_ in cls.__members__.values() @classmethod def has_type(cls, name: str) -> bool: """Returns True if the name is in the enum""" return name in cls.__members__.keys() # pylint: disable=consider-iterating-dictionary
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var config
var debug
var error
var hook
var memoryaddcombatant
var memorychangemap
var memorychangeprimaryplayer
var memorychatlog
var memorypartylist
var memoryplayerstats
var memoryremovecombatant
var memorysystemlogmessage
var memoryzonechange
var networkability
var networkaoeability
var networkcancelability
var networkdeath
var networkdirector
var networkdothot
var networkeffectresult
var networkgauge
var networklimitbreak
var networknametoggle
var networksignmarker
var networkstartscasting
var networkstatusadd
var networkstatuslist
var networkstatuslist3
var networkstatusremove
var networktargeticon
var networktether
var networkupdatehpmp
var networkwaymarkmarker
var packetdump
var unusedworld
var version
Static methods
def has_id(id_: int) ‑> bool
-
Returns True if the id is in the enum
Expand source code
@classmethod def has_id(cls, id_: int) -> bool: """Returns True if the id is in the enum""" return id_ in cls.__members__.values()
def has_type(name: str) ‑> bool
-
Returns True if the name is in the enum
Expand source code
@classmethod def has_type(cls, name: str) -> bool: """Returns True if the name is in the enum""" return name in cls.__members__.keys() # pylint: disable=consider-iterating-dictionary
class ActLogChecksumType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
List of hashsum algorithms used by different ACT versions
Expand source code
class ActLogChecksumType(Enum): """List of hashsum algorithms used by different ACT versions""" MD5 = "md5" SHA256 = "sha256"
Ancestors
- enum.Enum
Class variables
var MD5
var SHA256