Module nari.io.writer

Collection of writing-related classes and utilities

Expand source code
"""Collection of writing-related classes and utilities"""

from abc import ABCMeta, abstractmethod
from typing import Iterator

from nari.types.event import Event


class Writer(metaclass=ABCMeta):
    """Represents an abstract base for writer objects"""
    def __init__(self, stream: Iterator[Event]):
        self.stream = iter(stream)

    def write(self, amount: int = 0):
        """Prompts the writer to consume an amount of events or all events"""
        index: int = 1
        while index != amount:
            try:
                next_event = next(self.stream)
                self.write_next(next_event)
                index += 1
            except StopIteration:
                self.cleanup()
                return

    @abstractmethod
    def write_next(self, event: Event) -> None:
        """Implementing classes must implement this method to write the next event"""

    def cleanup(self):
        """Override to provide any cleanup items you need to do after the last item was written"""

Sub-modules

nari.io.writer.pickle

Uses python pickle to write logs to a blob file. Probably not helpful

Classes

class Writer (stream: Iterator[Event])

Represents an abstract base for writer objects

Expand source code
class Writer(metaclass=ABCMeta):
    """Represents an abstract base for writer objects"""
    def __init__(self, stream: Iterator[Event]):
        self.stream = iter(stream)

    def write(self, amount: int = 0):
        """Prompts the writer to consume an amount of events or all events"""
        index: int = 1
        while index != amount:
            try:
                next_event = next(self.stream)
                self.write_next(next_event)
                index += 1
            except StopIteration:
                self.cleanup()
                return

    @abstractmethod
    def write_next(self, event: Event) -> None:
        """Implementing classes must implement this method to write the next event"""

    def cleanup(self):
        """Override to provide any cleanup items you need to do after the last item was written"""

Subclasses

Methods

def cleanup(self)

Override to provide any cleanup items you need to do after the last item was written

Expand source code
def cleanup(self):
    """Override to provide any cleanup items you need to do after the last item was written"""
def write(self, amount: int = 0)

Prompts the writer to consume an amount of events or all events

Expand source code
def write(self, amount: int = 0):
    """Prompts the writer to consume an amount of events or all events"""
    index: int = 1
    while index != amount:
        try:
            next_event = next(self.stream)
            self.write_next(next_event)
            index += 1
        except StopIteration:
            self.cleanup()
            return
def write_next(self, event: Event) ‑> None

Implementing classes must implement this method to write the next event

Expand source code
@abstractmethod
def write_next(self, event: Event) -> None:
    """Implementing classes must implement this method to write the next event"""