Source code for dffml.df.archive

import uuid
import pathlib
import zipfile
import tarfile
import mimetypes
from typing import Dict, Tuple, Any

from .types import DataFlow, Input, InputFlow, Operation
from ..operation.archive import (
    make_tar_archive,
    make_zip_archive,
    extract_tar_archive,
    extract_zip_archive,
)
from ..operation.compression import (
    gz_compress,
    gz_decompress,
    bz2_compress,
    bz2_decompress,
    xz_compress,
    xz_decompress,
)


[docs]def get_key_substr(string: str, dict: dict, return_value: bool = True) -> Any: """ A function to find dictionary items whose key matches a substring. """ return [ value if return_value else key for key, value in dict.items() if string in key.lower() ][0]
[docs]def get_archive_type(file_path: str) -> Tuple[str]: """ A function to get archive type if the file exists. """ archive_type, compression_type = None, None if zipfile.is_zipfile(file_path): archive_type = "zip" if tarfile.is_tarfile(file_path): archive_type = "tar" compression_type = mimetypes.guess_type(file_path)[1] return archive_type, compression_type
[docs]def get_archive_path_info(path: str) -> Tuple[str]: """ A function to find type of archive from the given path if the file does not exists. """ archive_type, compression_type = None, None file_type, compression_type = mimetypes.guess_type(path) file_subtype = file_type.split("/")[-1] if file_type is not None else None if file_subtype == "zip": archive_type = "zip" if file_subtype == "x-tar": archive_type = "tar" return archive_type, compression_type
[docs]def get_operations( archive_action: str, archive_type: str, compression_type: str ) -> Tuple[Operation]: """ A function to fetch relevant operations based on type of archive and compression if any. """ operations = { "archive_ops": { "zip": { "extract": extract_zip_archive, "archive": make_zip_archive, }, "tar": { "extract": extract_tar_archive, "archive": make_tar_archive, }, }, "compression_ops": { "gzip": {"compress": gz_compress, "decompress": gz_decompress}, "xz": {"compress": xz_compress, "decompress": xz_decompress}, "bzip2": {"compress": bz2_compress, "decompress": bz2_decompress,}, }, } archive_op = operations["archive_ops"][archive_type][archive_action] compression_op = None if compression_type is not None: compression_action = ( "compress" if archive_action == "archive" else "decompress" ) compression_op = operations["compression_ops"][compression_type][ compression_action ] return archive_op, compression_op
[docs]def deduce_archive_action(seed: Dict) -> Tuple[str]: """ A function to deduce archive action as 'extract' or 'archive' based on the seed and find type and compression of the archive. """ input_path, output_path = seed["input_path"], seed["output_path"] input_exists, input_is_file, input_is_dir = ( input_path.exists(), input_path.is_file(), input_path.is_dir(), ) output_exists, output_is_file, output_is_dir = ( output_path.exists(), output_path.is_file(), output_path.is_dir(), ) if all([input_exists, output_exists, output_is_dir, input_is_file]): action = "extract" archive_type, compression_type = get_archive_type(input_path) elif all([input_exists, output_exists, input_is_dir, output_is_file]): action = "archive" archive_type, compression_type = get_archive_type(output_path) elif all([input_exists, not output_exists, input_is_dir]): # Triggered on first time use action = "archive" archive_type, compression_type = get_archive_path_info(output_path) return action, archive_type, compression_type
[docs]def create_chained_archive_dataflow( action, first_op, second_op, seed, temp_dir ) -> DataFlow: """ A function to create chained dataflows for archive extraction/creation. """ second_op_output_typ = "directory" if action == "extract" else "file" dataflow = DataFlow( operations={first_op.op.name: first_op, second_op.op.name: second_op,}, seed={ Input( value=seed["input_path"], definition=get_key_substr("input", first_op.op.inputs), ), Input( value=temp_dir / f"{str(uuid.uuid4())}.tar", definition=get_key_substr("output", first_op.op.inputs), ), Input( value=seed["output_path"], definition=get_key_substr("output", second_op.op.inputs), origin="seed.final_output", ), }, ) dataflow.flow.update( { second_op.op.name: InputFlow( inputs={ "input_file_path": [{first_op.op.name: "output_path"}], f"output_{second_op_output_typ}_path": [ "seed.final_output" ], } ) } ) dataflow.update() return dataflow
[docs]def create_archive_dataflow(seed: set) -> DataFlow: """ A function to create appropriate dataflow to extract/create an archive if it is supported. """ seed = {input_.origin: pathlib.Path(input_.value) for input_ in seed} action, archive_type, compression_type = deduce_archive_action(seed) archive_op, compression_op = get_operations( action, archive_type, compression_type ) if compression_op is None: dataflow = DataFlow( operations={archive_op.op.name: archive_op}, seed={ Input( value=seed["input_path"], definition=get_key_substr("input", archive_op.op.inputs), ), Input( value=seed["output_path"], definition=get_key_substr("output", archive_op.op.inputs), ), }, ) else: first_op = compression_op if action == "extract" else archive_op second_op = ( compression_op if first_op is not compression_op else archive_op ) dataflow = create_chained_archive_dataflow( action, first_op, second_op, seed, seed["input_path"].parent ) return dataflow