Source code for dffml.source.file

# SPDX-License-Identifier: MIT
# Copyright (c) 2019 Intel Corporation
import os
import io
import abc
import bz2
import gzip
import lzma
import errno
import zipfile
from contextlib import contextmanager
import pathlib

from ..base import config
from .source import BaseSource
from ..util.entrypoint import entrypoint


[docs]@config class FileSourceConfig: filename: str tag: str = "untagged" readwrite: bool = False allowempty: bool = False
[docs]@entrypoint("file") class FileSource(BaseSource): """ FileSource reads and write from a file on open / close. """ CONFIG = FileSourceConfig READMODE: str = "r" WRITEMODE: str = "w" READMODE_COMPRESSED: str = "rt" WRITEMODE_COMPRESSED: str = "wt" def __init__(self, config): super().__init__(config) if isinstance(getattr(self.config, "filename", None), str): with self.config.no_enforce_immutable(): self.config.filename = pathlib.Path(self.config.filename) async def __aenter__(self) -> "BaseSourceContext": await self._open() return self async def __aexit__(self, exc_type, exc_value, traceback): await self._close() async def _empty_file_init(self): return {} async def _open(self): if not os.path.exists(self.config.filename) or os.path.isdir( self.config.filename ): if self.config.allowempty: self.logger.debug( ("%r is not a file, " % (self.config.filename,)) + "initializing memory to empty dict" ) self.mem = await self._empty_file_init() return else: raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), self.config.filename, ) if self.config.filename.suffix == ".gz": opener = gzip.open(self.config.filename, self.READMODE_COMPRESSED) elif self.config.filename.suffix == ".bz2": opener = bz2.open(self.config.filename, self.READMODE_COMPRESSED) elif ( self.config.filename.suffix == ".xz" or self.config.filename.suffix == ".lzma" ): opener = lzma.open(self.config.filename, self.READMODE_COMPRESSED) elif self.config.filename.suffix == ".zip": opener = self.zip_opener_helper() else: opener = open(self.config.filename, self.READMODE) with opener as fd: await self.load_fd(fd) async def _close(self): if self.config.readwrite: if self.config.filename.suffix == ".gz": close = gzip.open( self.config.filename, self.WRITEMODE_COMPRESSED ) elif self.config.filename.suffix == ".bz2": close = bz2.open( self.config.filename, self.WRITEMODE_COMPRESSED ) elif ( self.config.filename.suffix == ".xz" or self.config.filename.suffix == ".lzma" ): close = lzma.open( self.config.filename, self.WRITEMODE_COMPRESSED ) elif self.config.filename.suffix == ".zip": close = self.zip_closer_helper() else: close = open(self.config.filename, self.WRITEMODE, newline="") with close as fd: await self.dump_fd(fd) @contextmanager def zip_opener_helper(self): with zipfile.ZipFile(self.config.filename) as archive: with archive.open( self.__class__.__qualname__, mode=self.READMODE ) as zip_fd: with io.TextIOWrapper(zip_fd, write_through=True) as fd: yield fd @contextmanager def zip_closer_helper(self): with zipfile.ZipFile( self.config.filename, self.WRITEMODE, compression=zipfile.ZIP_BZIP2 ) as archive: with archive.open( self.__class__.__qualname__, mode=self.WRITEMODE, force_zip64=True, ) as zip_fd: with io.TextIOWrapper(zip_fd, write_through=True) as fd: yield fd @abc.abstractmethod async def load_fd(self, fd): pass # pragma: no cover @abc.abstractmethod async def dump_fd(self, fd): pass # pragma: no cover
[docs]@config class BinaryFileSourceConfig(FileSourceConfig): pass
[docs]@entrypoint("binaryfile") class BinaryFileSource(FileSource): CONFIG = BinaryFileSourceConfig READMODE: str = "rb" WRITEMODE: str = "wb" READMODE_COMPRESSED: str = "rb" WRITEMODE_COMPRESSED: str = "wb"