Module nari.io.writer
Collection of writing-related classes and utilities
Expand source code
"""Collection of writing-related classes and utilities"""
from abc import ABCMeta, abstractmethod
from typing import Iterator
from nari.types.event import Event
class Writer(metaclass=ABCMeta):
"""Represents an abstract base for writer objects"""
def __init__(self, stream: Iterator[Event]):
self.stream = iter(stream)
def write(self, amount: int = 0):
"""Prompts the writer to consume an amount of events or all events"""
index: int = 1
while index != amount:
try:
next_event = next(self.stream)
self.write_next(next_event)
index += 1
except StopIteration:
self.cleanup()
return
@abstractmethod
def write_next(self, event: Event) -> None:
"""Implementing classes must implement this method to write the next event"""
def cleanup(self):
"""Override to provide any cleanup items you need to do after the last item was written"""
Sub-modules
nari.io.writer.pickle
-
Uses python pickle to write logs to a blob file. Probably not helpful
Classes
class Writer (stream: Iterator[Event])
-
Represents an abstract base for writer objects
Expand source code
class Writer(metaclass=ABCMeta): """Represents an abstract base for writer objects""" def __init__(self, stream: Iterator[Event]): self.stream = iter(stream) def write(self, amount: int = 0): """Prompts the writer to consume an amount of events or all events""" index: int = 1 while index != amount: try: next_event = next(self.stream) self.write_next(next_event) index += 1 except StopIteration: self.cleanup() return @abstractmethod def write_next(self, event: Event) -> None: """Implementing classes must implement this method to write the next event""" def cleanup(self): """Override to provide any cleanup items you need to do after the last item was written"""
Subclasses
Methods
def cleanup(self)
-
Override to provide any cleanup items you need to do after the last item was written
Expand source code
def cleanup(self): """Override to provide any cleanup items you need to do after the last item was written"""
def write(self, amount: int = 0)
-
Prompts the writer to consume an amount of events or all events
Expand source code
def write(self, amount: int = 0): """Prompts the writer to consume an amount of events or all events""" index: int = 1 while index != amount: try: next_event = next(self.stream) self.write_next(next_event) index += 1 except StopIteration: self.cleanup() return
def write_next(self, event: Event) ‑> None
-
Implementing classes must implement this method to write the next event
Expand source code
@abstractmethod def write_next(self, event: Event) -> None: """Implementing classes must implement this method to write the next event"""