Source code for ClearMap.IO.FileUtils

# -*- coding: utf-8 -*-
"""
FileUtils
=========

This module provides utilities for file management used by various IO modules.

See also
--------
:mod:`ClearMap.IO`.
"""
__author__ = 'Christoph Kirst <christoph.kirst.ck@gmail.com>'
__license__ = 'GPLv3 - GNU General Pulic License v3 (see LICENSE.txt)'
__copyright__ = 'Copyright © 2020 by Christoph Kirst'
__webpage__ = 'https://idisco.info'
__download__ = 'https://www.github.com/ChristophKirst/ClearMap2'

import os
import shutil
import importlib

__all__ = ['is_file', 'is_directory', 'file_extension', 'join', 'split',
           'abspath', 'create_directory', 'delete_directory',
           'copy_file', 'delete_file']

from pathlib import Path

from ClearMap.IO import IO as clearmap_io


##############################################################################
# ## Basic file queries
##############################################################################


[docs] def is_file(filename): """Checks if a file exists. Arguments --------- filename : str The file name to check if it exists. Returns ------- is_file : bool True if filename exists on disk and is not a directory. """ if not isinstance(filename, str): return False if os.path.isdir(filename): return False return os.path.exists(filename)
[docs] def is_directory(dirname): """ Checks if a directory exists. Arguments --------- dirname : str The directory name. Returns ------- is_directory : bool True if source is a real file. """ if not isinstance(dirname, str): return False return os.path.isdir(dirname)
############################################################################## # ## File name manipulation ##############################################################################
[docs] def file_extension(filename): """ Returns the file extension of a file Arguments --------- filename : str The file name. Returns ------- extension : str The file extension or None if it does not exists. """ if not isinstance(filename, str): return fext = filename.split('.') # FIXME: use os.path.splitext(...)[-1][1:] # To remove leading dot if len(fext) < 2: return else: return fext[-1]
[docs] def join(path, filename): """ Joins a path to a file name. Arguments --------- path : str The path to append a file name to. filename : str The file name. Returns ------- filename : str The full file name. """ # TODO: correct to allow joining '/foo' with '/bar' to /foo/bar (os gives /bar!) if len(filename) > 0 and filename[0] == '/': filename = filename[1:] return os.path.join(path, filename)
[docs] def split(filename): """ Splits a file name into it's path and name. Arguments --------- filename : str The file name. Returns ------- path : str The path of the file. filename : str The file name. """ return os.path.split(filename)
[docs] def abspath(filename): """ Returns the filename using the full path specification. Arguments --------- filename : str The file name. Returns ------- filename : str The full file name. """ return os.path.abspath(filename)
############################################################################## # ## File manipulation ##############################################################################
[docs] def create_directory(filename, split=True): """ Creates the directory of the file name if it does not exists. Arguments --------- filename : str The name to create the directory from. split : bool If True, split the filename first. Returns ------- directory : str The directory name. """ if split: path, name = os.path.split(filename) else: path = filename if not is_directory(path): os.makedirs(path) return path
[docs] def delete_directory(filename, split=False): """ Deletes a directory of the filename if it exists. Arguments --------- filename : str The name to create the directory from. split : bool If True, split the filename first. Returns ------- directory : str The directory name. """ if split: path, name = os.path.split(filename) else: path = filename if is_directory(path): shutil.rmtree(path)
[docs] def delete_file(filename): """ Deletes a file. Arguments --------- filename : str Filename to delete. """ if is_file(filename): os.remove(filename)
[docs] def copy_file(source, sink): """Copy a file. Arguments --------- source : str Filename of the file to copy. sink : str File or directory name to copy the file to. Returns ------- sink : str The name of the copied file. """ if is_directory(sink): path, name = os.path.split(source) sink = os.path.join(sink, name) shutil.copy(source, sink) return sink
def uncompress(file_path, extension='zip', check=True, verbose=False): """ Unzips a file only if: 1) the file does not exist (check) 2) the compressed file exists. Arguments --------- file_path : str or pathlib.Path The file path to search for. extension : str The extension for the compressed file. check : bool If True, check if the decompressed file already exists. verbose : bool Print progress info. Returns ------- filename : str or None The uncompressed filename or None if failed. """ compression_algorithms = ('zip', 'bz2', 'gzip', 'lzma') file_path = Path(file_path) if file_path.exists() and file_path.suffix in compression_algorithms: return uncompress(file_path.with_suffix(''), extension=file_path.suffix, check=check, verbose=verbose) if not file_path.exists() or not check: if extension == 'auto': for algo in compression_algorithms: f_path_w_ext = file_path.with_suffix(f'{file_path.suffix}.{algo}') if f_path_w_ext.exists(): extension = algo break else: raise ValueError(f'Could not find compressed source for {file_path}') compressed_path = file_path.with_suffix(f'{file_path.suffix}.{algo}') if compressed_path.exists(): if verbose: print(f'Decompressing source: {compressed_path}') if extension == 'zip': import zipfile try: with zipfile.ZipFile(compressed_path, 'r') as zipf: if file_path.suffix in ('.tif', '.nrrd'): zipf.extract(file_path.name, path=compressed_path.parent) else: zipf.extractall(path=compressed_path.parent) if not file_path.exists(): raise FileNotFoundError except Exception as err: # FIXME: TOO broad print(err) return elif extension in ('bz2', 'gzip', 'lzma'): mod = importlib.import_module(extension) with open(file_path, 'wb') as out, \ open(compressed_path, 'rb') as compressed_file: out.write(mod.decompress(compressed_file.read())) else: raise NotImplementedError(f'Unrecognized compression extension {extension}') else: print(f'Cannot find compressed source: {compressed_path}') return file_path def compress(file_path, extension='zip', check=True, verbose=False): """ Compresses a file only if: 1) the file exists (check) 2) the compressed file does not exist. Arguments --------- file_path : str or pathlib.Path The file path to search for. extension : str The extension for the compressed file. check : bool If True, check if the compressed file already exists. verbose : bool Print progress info. Returns ------- filename : Path or None The compressed filename or None if failed. """ file_path = Path(file_path) if file_path.exists() and check: if extension == 'auto': extension = 'zip' compressed_path = file_path.with_suffix(f'{file_path.suffix}.{extension}') if not compressed_path.exists(): if verbose: print(f'Compressing source: {file_path}') if extension == 'zip': import zipfile with zipfile.ZipFile(compressed_path, 'w', compression=zipfile.ZIP_DEFLATED) as zipf: if file_path.suffix in ('.tif', '.nrrd'): zipf.write(file_path, arcname=file_path.name) else: for root, dirs, files in file_path.walk(): for file in files: zipf.write(root / file, arcname=root / file) if not compressed_path.exists(): raise FileNotFoundError(f'Could not create compressed file {compressed_path} from {file_path}') elif extension in ('bz2', 'gzip', 'lzma'): mod = importlib.import_module(extension) with open(file_path, 'rb') as in_file, open(compressed_path, 'wb') as compressed_file: compressed_file.write(mod.compress(in_file.read())) else: raise NotImplementedError(f'Unrecognized compression extension {extension}') else: print(f'Compressed file {compressed_path} exists. Skipping compression.') return compressed_path def checksum(file_path, algorithm='md5', verbose=False): """ Calculates the checksum of a file. Arguments --------- file_path : str or pathlib.Path The file path to search for. algorithm : str The algorithm to use for the checksum. verbose : bool Print progress info. Returns ------- checksum : str or None The checksum or None if failed. """ file_path = Path(file_path) if file_path.exists(): if verbose: print(f'Calculating checksum for {file_path}') import hashlib hash = hashlib.new(algorithm) # TODO: check because could be faster to call the function directly with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hash.update(chunk) file_checksum = hash.hexdigest() else: raise FileNotFoundError(f'Cannot find file {file_path}') return file_checksum def find_existing_extension(path, extensions): # FIXME: see if could use Glob """ Return the first extension in the list of `extensions` that creates and existing file with `path`. Parameters ---------- path extensions Returns ------- str or None """ for extension in extensions: if path.with_suffix(extension).exists(): return extension def check_extensions(extensions): """ Check if the extensions are valid (i.e. correctly constructed and supported by ClearMap IO). Parameters ---------- extensions: list[str] The list of extensions to check. Raises ------ ValueError """ for ext in extensions: if not ext.startswith('.'): raise ValueError(f'Extension "{ext}" should start with a dot.') if not is_clearmap_source_extension(ext): raise ValueError(f'Unknown extension "{ext}". ' f'Supported extensions are "{clearmap_io.file_extension_to_module.keys()}".') def is_clearmap_source_extension(extension): """ Check if the extension is supported by ClearMap IO. Parameters ---------- extension: str The extension to check. Returns ------- bool """ return extension.lstrip('.') in clearmap_io.file_extension_to_module.keys() ############################################################################### # ## Tests ############################################################################### def test(): import ClearMap.IO.FileUtils as fu reload(fu) filename = fu.__file__ path, name = fu.os.path.split(filename) fu.is_file(filename), fu.is_directory(filename) fu.file_extension(filename)