# -*- coding: utf-8 -*-
"""
Workspace
=========
The Workspace module keeps track of the data files of a project.
Using this module will simplify access to data and results using coherent
filenames across experiments and samples.
One can think of a Workspace as a transparent data structure for ClearMap.
Note
----
Additional standard filenames can be added in the `ftype_to_filename dict`.
"""
__author__ = 'Christoph Kirst <christoph.kirst.ck@gmail.com>'
__license__ = 'GPLv3 - GNU General Public License v3 (see LICENSE.txt)'
__copyright__ = 'Copyright © 2023 by Christoph Kirst'
__webpage__ = 'https://idisco.info'
__download__ = 'https://www.github.com/ChristophKirst/ClearMap2'
# TODO: provide modularity ->
# e.g. modules for various files,
# adding/removing channels,
# slicing of test data etc.
import os
from collections import OrderedDict
import numpy as np
import ClearMap.IO.IO as clearmap_io
from ClearMap.Utils.TagExpression import Expression
# import ClearMap.ParallelProcessing.DataProcessing.ArrayProcessing as array_processing
import ClearMap.Visualization.Plot3d as q_plot_3d
########################################################################################################################
# Workspaces
########################################################################################################################
workspace_types = [None, 'CellMap', 'TubeMap', 'AxonMap', 'Both'] # FIXME: these shouldn't be mutually exclusive
########################################################################################################################
# File and directory names
########################################################################################################################
default_directory_type = 'analysis'
default_directory_type_to_name = OrderedDict(
data=None,
analysis=None,
)
default_file_type_to_name = OrderedDict(
raw = "Raw/raw_<X,2>_<Y,2>.npy",
autofluorescence = "Autofluorescence/auto_<X,2>_<Y,2>.npy",
stitched = "stitched.npy",
layout = "layout.lyt",
background = "background.npy",
resampled = "resampled.tif",
autofluorescence_resampled="resampled_autofluorescence.tif",
resampled_to_autofluorescence='elastix_resampled_to_auto',
resampled_aligned_to_autofluorescence="elastix_resampled_to_auto/result.0.mhd",
autofluorescence_to_reference='elastix_auto_to_reference',
autofluorescence_aligned_to_reference="elastix_auto_to_reference/result.1.mhd",
)
default_file_type_to_directory_type = OrderedDict(
raw="data",
autofluorescence="data",
resampled="data",
autofluorescence_resampled="data",
resampled_to_autofluorescence="data",
resampled_aligned_to_autofluorescence="data",
autofluorescence_to_reference="data",
autofluorescence_aligned_to_reference="data",
)
default_file_type_synonyms = dict(
r = "raw",
a = "autofluorescence",
st = "stitched",
l = "layout",
bg = "background",
rs = "resampled",
auto="autofluorescence",
resampled_to_auto='resampled_to_autofluorescence',
resampled_aligned_to_auto='resampled_aligned_to_autofluorescence',
auto_to_reference='autofluorescence_to_reference',
auto_aligned_to_reference='autofluorescence_aligned_to_reference',
resampled_auto='autofluorescence_resampled',
auto_resampled='autofluorescence_resampled'
)
# Workspace specialization
default_file_type_to_name_from_workspace = OrderedDict()
default_file_type_to_directory_type_from_workspace = OrderedDict()
default_file_type_synonyms_from_workspace = OrderedDict()
default_file_type_to_name_from_workspace[None] = default_file_type_to_name.copy()
default_file_type_to_directory_type_from_workspace[None] = default_file_type_to_directory_type.copy()
default_file_type_synonyms_from_workspace[None] = default_file_type_synonyms.copy()
default_file_type_to_name_from_workspace['CellMap'] = default_file_type_to_name.copy()
default_file_type_to_name_from_workspace['CellMap'].update(
raw="/Raw/Z<Z,4>.tif",
autofluorescence="/Autofluorescence/Z<Z,4>.tif",
cells="cells.npy",
density="density.tif",
)
default_file_type_to_directory_type_from_workspace['CellMap'] = default_file_type_to_directory_type.copy()
default_file_type_to_name_from_workspace['CellMap'].update(
)
default_file_type_synonyms_from_workspace['CellMap'] = default_file_type_synonyms.copy()
default_file_type_synonyms_from_workspace['CellMap'].update(
c="cells"
)
default_file_type_to_name_from_workspace['TubeMap'] = default_file_type_to_name.copy()
default_file_type_to_name_from_workspace['TubeMap'].update(
arteries = "Raw/arteries_<X,2>_Y,2>.npy",
binary = "binary.npy",
binary_status = "binary_status.npy",
skeleton = 'skeleton.npy',
graph = "graph.gt",
density = "density.tif"
)
default_file_type_to_name_from_workspace['Both'] = {**default_file_type_synonyms_from_workspace['CellMap'],
**default_file_type_to_name_from_workspace['TubeMap']}
default_file_type_to_directory_type_from_workspace['TubeMap'] = default_file_type_to_directory_type.copy()
default_file_type_to_directory_type_from_workspace['TubeMap'].update(
arteries="data",
)
default_file_type_synonyms_from_workspace['TubeMap'] = default_file_type_synonyms.copy()
default_file_type_synonyms_from_workspace['TubeMap'].update(
b = "binary",
bs = "binary_status",
g = "graph",
sk = "skeleton"
)
default_file_type_to_name_from_workspace['AxonMap'] = default_file_type_to_name.copy()
default_file_type_to_name_from_workspace['AxonMap'].update(
axons='axons.npy',
orientation="orientation.npy",
preprocessed="analyzed/preprocessed.npy",
lightsheet="analyzed/lightsheet.npy",
equalized="analyzed/equalized.npy",
tubeness="analyzed/tubeness.npy",
hessian="analyzed/hessian.npy",
features="analyzed/features.npy",
fod="analyzed/fod.npy"
)
default_file_type_to_directory_type_from_workspace['AxonMap'] = default_file_type_to_directory_type.copy()
default_file_type_to_directory_type_from_workspace['AxonMap'].update(
)
default_file_type_synonyms_from_workspace['AxonMap'] = default_file_type_synonyms.copy()
default_file_type_synonyms_from_workspace['AxonMap'].update(
)
########################################################################################################################
# Determine file and directory names
########################################################################################################################
[docs]
def file_type(f_type=None, ws_type=None, file_type_synonyms=None):
file_type_synonyms_ = default_file_type_synonyms_from_workspace[ws_type].copy()
if file_type_synonyms is not None:
file_type_synonyms_.update(file_type_synonyms)
f_type = file_type_synonyms_.get(f_type, f_type)
return f_type
[docs]
def directory_type(f_type=None, ws_type=None,
file_type_to_directory_type=None,
file_type_synonyms=None, return_f_type=False):
f_type = file_type(f_type=f_type, ws_type=ws_type,
file_type_synonyms=file_type_synonyms)
file_type_to_directory_type_ = default_file_type_to_directory_type_from_workspace[ws_type].copy()
if file_type_to_directory_type is not None:
file_type_to_directory_type_.update(file_type_to_directory_type)
if f_type is not None:
if f_type in file_type_to_directory_type:
dirtype = file_type_to_directory_type[f_type]
else:
dirtype = default_directory_type
else:
dirtype = None
if return_f_type:
return dirtype, f_type
return dirtype
[docs]
def directory_name(f_type=None, dirtype=None, ws_type=None, directory=None, default_directory=None,
file_type_to_directory_type=None,
directory_type_to_name=None,
file_type_synonyms=None, return_f_type=False, return_dirtype=False):
"""Returns the directory name to use for a file in the work space.
Arguments
---------
f_type : str or None
The type of the file for which the file name is requested. If None, use the
dirtype : str or None
The directory type, overwrites the directory type inferred from the file type if not None.
ws_type : str or None
The type of workspace to use. If None, use the default workspace.
directory : str or None
A specific directory to use.
default_directory : str or None
A default directory to fall back to if not None.
file_type_to_directory_type : dict or None
The file type to determine the directory type for. If None, the default is used.
directory_type_to_name : dict or None
The mapping between th directory type and the actual directory.
file_type_synonyms : dict or None
Additional file type synonyms or abbreviation to use for the file types.
return_f_type : bool
If True return also the file type
return_dirtype : bool
If True return also the directory type.
Returns
-------
directory : str
The standard directory name of the requested file type.
"""
if dirtype is None:
dirtype, f_type = directory_type(f_type=f_type, ws_type=ws_type,
file_type_to_directory_type=file_type_to_directory_type,
file_type_synonyms=file_type_synonyms, return_f_type=True)
if directory is None:
if dirtype is None:
raise ValueError('cannot determine directory')
else:
directory_type_to_name_ = default_directory_type_to_name.copy()
if directory_type_to_name is not None:
directory_type_to_name_.update(directory_type_to_name)
directory = directory_type_to_name_.get(dirtype, directory)
if directory is None and default_directory is not None:
directory = default_directory
if directory is not None and len(directory) > 0 and directory[-1] == os.path.sep:
directory = directory[:-1]
result = (directory,)
if return_f_type:
result += (f_type,)
if return_dirtype:
result += (dirtype,)
if len(result) == 1:
result = result[0]
return result
[docs]
def file_name(f_type, dirtype=None, ws_type=None, directory=None, default_directory=None,
expression=None, values=None, prefix=None, postfix=None, extension=None, debug=None,
file_type_to_name=None, file_type_synonyms=None,
file_type_to_directory_type=None, directory_type_to_name=None, **kwargs):
"""
Returns the standard file name to use for a result file.
Arguments
---------
f_type : str or None
The type of the file for which the file name is requested.
dirtype : str or None
The directory type to use for the requested file name.
ws_type : str or None
The type of workspace to use. If None, use the default workspace.
directory : str or None
A specific directory for the file.
default_directory : str or None
A default directory to fall back to if not None.
expression : str or None
A tag expression to use if f_type is 'expression'.
values : dict or None
The values to use in case a tag expression is given.
prefix : str or None
Optional prefix to the file if not None.
postfix : str or list of str or None
Optional postfix to the file if not None.
extension : str or None
Optional extension to replace existing one.
debug : str, bool or None
Optional string for testing in which the string is added as postfix.
If True, 'debug' is added.
file_type_to_name : dict or None
The file types to name mappings. If None, the default is used.
file_type_synonyms : dict or None
Additional file type synonyms or abbreviation to use for the file types.
file_type_to_directory_type : dict or None
The file type to determine the directory type for. If None, the default is used.
directory_type_to_name : dict or None
The mapping between th directory type and the actual directory.
Returns
-------
filename : str
The file name of the requested file specifications.
"""
directory, f_type = directory_name(f_type=f_type, dirtype=dirtype, ws_type=ws_type,
directory=directory, default_directory=default_directory,
file_type_to_directory_type=file_type_to_directory_type,
directory_type_to_name=directory_type_to_name,
return_f_type=True, **kwargs)
file_type_to_name_ = default_file_type_to_name_from_workspace[ws_type].copy()
if file_type_to_name is not None:
file_type_to_name_.update(file_type_to_name)
if f_type in file_type_synonyms.keys():
f_type = file_type_synonyms[f_type]
if f_type == 'expression' or expression is not None:
f_name = Expression(expression).string(values=values)
# Note: expressions are used for raw data only atm -> no prefix, debug
# prefix = None
# debug = None
else:
f_name = file_type_to_name_.get(f_type)
f_name_expression = Expression(f_name)
if f_name_expression.tags:
f_name = f_name_expression.string(values=values)
# Note: expressions are used for raw data only atm -> no prefix, debug
prefix = None
debug = None
if f_name is None:
raise ValueError(f'Cannot find name for type {f_type}!')
if prefix:
if isinstance(prefix, list):
prefix = '_'.join(prefix)
path, file_ = os.path.split(f_name)
f_name = os.path.join(path, f'{prefix}_{file_}')
if postfix:
if isinstance(postfix, list):
postfix = '_'.join(postfix)
base, ext = os.path.splitext(f_name)
f_name = f'{base}_{postfix}{ext}'
if debug:
if not isinstance(debug, str):
debug = 'debug'
path, file_ = os.path.split(f_name)
f_name = os.path.join(path, f'{debug}_{file_}')
if extension:
extension = extension if extension.startswith('.') else f'.{extension}'
f_name = f'{os.path.splitext(f_name)[0]}{extension}'
if directory:
f_name = clearmap_io.join(directory, f_name)
return f_name
###############################################################################
# Workspace
###############################################################################
[docs]
class Workspace(object):
"""Class to organize files."""
def __init__(self, ws_type=None, directory=None, prefix=None, postfix=None, debug=None,
file_type_to_name=None, file_type_synonyms=None,
file_type_to_directory_type=None, directory_type_to_name=None,
**kwargs):
self._wstype = ws_type # RO
self.prefix = prefix
self.postfix = postfix
self._debug = debug
if directory:
directory = os.path.normpath(directory)
self.directory = directory
self._file_type_to_name = default_file_type_to_name_from_workspace.get(ws_type, default_file_type_to_name).copy()
if file_type_to_name is not None:
self._file_type_to_name.update(file_type_to_name)
self._file_type_to_name.update(**kwargs)
self._file_type_synonyms = default_file_type_synonyms_from_workspace.get(ws_type,
default_file_type_synonyms).copy()
if file_type_synonyms is not None:
self._file_type_to_name.update(file_type_synonyms)
self._file_type_to_directory_type = default_file_type_to_directory_type_from_workspace.get(
ws_type, default_file_type_to_directory_type).copy()
if file_type_to_directory_type is not None:
self._file_type_to_directory_type.update(file_type_to_directory_type)
self._directory_type_to_name = default_directory_type_to_name.copy()
if directory_type_to_name is not None:
self._directory_type_to_name.update(directory_type_to_name)
@property
def wstype(self):
return self._wstype
# @wstype.setter
# def wstype(self, value): # Remove setter -> RO
# self.update(default_workspaces.get(value, default_file_type_to_name))
# self._wstype = value
# @property
# def prefix(self):
# return self._prefix
#
# @prefix.setter
# def prefix(self, value):
# self._prefix = value
# @property
# def postfix(self):
# return self._postfix
#
# @postfix.setter
# def postfix(self, value):
# self._postfix = value
@property
def directory(self):
return self._directory
@directory.setter
def directory(self, path):
# if value and len(value) > 0 and value[-1] == os.path.sep:
# value = value[:-1]
self._directory = path.rstrip(os.sep)
[docs]
def load(self, file_path):
"""Loads the workspace configuration from disk"""
d = np.load(file_path)[0]
self.__dict__.update(d)
[docs]
def save(self, file_path):
"""Saves the workspace configuration to disk"""
# prevent np to add .npy to a .workspace file
with open(file_path, "wb") as fid:
np.save(fid, [self.__dict__])
# @property
# def file_type_to_name(self):
# return self._file_type_to_name
#
# @file_type_to_name.setter
# def file_type_to_name(self, value):
# self._file_type_to_name = value
@property
def file_type_to_directory_type(self):
return self._file_type_to_directory_type
@file_type_to_directory_type.setter
def file_type_to_directory_type(self, value):
self._file_type_to_directory_type = value
@property
def file_type_synonyms(self):
return self._file_type_synonyms
@file_type_synonyms.setter
def file_type_synonyms(self, value):
self._file_type_synonyms = value
@property
def directory_type_to_name(self):
return self._directory_type_to_name
@directory_type_to_name.setter
def directory_type_to_name(self, value):
self._directory_type_to_name = value
[docs]
def update(self, *args, **kwargs):
# FIXME: add check that
self._file_type_to_name.update(*args, **kwargs)
@property
def debug(self):
return self._debug
@debug.setter
def debug(self, value):
if value is True: # So that value can be a custom string
value = 'debug'
self._debug = value if value else None
[docs]
def create_debug(self, f_type, slicing, debug=None, **kwargs):
if debug is None:
debug = self.debug if self.debug is not None else 'debug'
self.debug = None
# FIXME: why not force debug=None here instead of unset set
source = clearmap_io.as_source(self.file_name(f_type, **kwargs))
self.debug = debug
return clearmap_io.write(self.file_name(f_type, **kwargs), np.asarray(source[slicing], order='F'))
[docs]
def file_name(self, f_type=None, dirtype=None, ws_type=None, directory=None,
expression=None, values=None, prefix=None, postfix=None, extension=None, debug=None,
file_type_to_name=None, file_type_synonyms=None,
file_type_to_directory_type=None, directory_type_to_name=None,
**kwargs):
return file_name(f_type=f_type, dirtype=dirtype, ws_type=ws_type or self.wstype,
directory=directory, default_directory=self.directory,
expression=expression, values=values,
prefix=prefix or self.prefix,
postfix=postfix or self.postfix,
extension=extension, debug=debug or self.debug,
file_type_to_name=file_type_to_name or self._file_type_to_name,
file_type_synonyms=file_type_synonyms or self.file_type_synonyms,
file_type_to_directory_type=file_type_to_directory_type or self.file_type_to_directory_type,
directory_type_to_name=directory_type_to_name or self.directory_type_to_name,
**kwargs)
[docs]
def filename(self, *args, **kwargs):
return self.file_name(*args, **kwargs)
[docs]
def exists(self, f_type, file_type_to_name=None, directory=None, expression=None, values=None, prefix=None,
extension=None, debug=None, **kwargs):
return os.path.exists(self.file_name(f_type, file_type_to_name=file_type_to_name, directory=directory,
expression=expression, values=values, prefix=prefix, extension=extension,
debug=debug, **kwargs))
[docs]
def all_tiles_exist(self, f_type, file_type_to_name=None, directory=None, expression=None, values=None,
prefix=None, extension=None, debug=None, **kwargs):
files = self.file_list(f_type, file_type_to_name=file_type_to_name, directory=directory, expression=expression,
values=values, prefix=prefix, extension=extension, debug=debug, **kwargs)
return len(files) == self.mosaic_shape(f_type).prod()
[docs]
def directory_type(self, f_type=None, ws_type=None,
file_type_to_directory_type=None, file_type_synonyms=None, return_f_type=False):
return directory_type(f_type=f_type, ws_type=ws_type or self.wstype,
file_type_to_directory_type=file_type_to_directory_type or self.file_type_to_directory_type,
file_type_synonyms=file_type_synonyms or self.file_type_synonyms,
return_f_type=return_f_type)
[docs]
def directory_name(self, f_type=None, dirtype=None, ws_type=None, directory=None, default_directory=None,
file_type_to_directory_type=None,
directory_type_to_name=None,
file_type_synonyms=None, return_f_type=False, return_dirtype=False):
return directory_name(f_type=f_type, dirtype=dirtype, ws_type=ws_type or self.wstype,
directory=directory, default_directory=default_directory or self.directory,
file_type_to_directory_type=file_type_to_directory_type or self.file_type_to_directory_type,
directory_type_to_name=directory_type_to_name or self.directory_type_to_name,
file_type_synonyms=file_type_synonyms or self.file_type_synonyms,
return_f_type=return_f_type, return_dirtype=return_dirtype)
[docs]
def expression(self, *args, **kwargs):
return Expression(self.file_name(*args, **kwargs))
[docs]
def mosaic_shape(self, f_type):
exp = Expression(self.file_name(f_type))
positions = self.get_positions(f_type)
tile_axes_ = exp.tag_names()
indices = [tuple(tv[n] for n in tile_axes_) for tv in positions]
# noinspection PyArgumentList
mosaic_shape = np.array(indices).max(axis=0) + 1 # Because 0 indexing
return mosaic_shape
[docs]
def get_positions(self, f_type):
exp = Expression(self.file_name(f_type))
files = self.file_list(f_type)
positions = [exp.values(f) for f in files]
return positions
[docs]
def extension(self, *args, **kwargs):
return clearmap_io.file_extension(self.file_name(*args, **kwargs))
[docs]
def file_list(self, *args, **kwargs):
return clearmap_io.file_list(self.file_name(*args, **kwargs))
[docs]
def create(self, f_type=None, dtype=None, shape=None, order=None, **kwargs):
filename = self.file_name(f_type=f_type, **kwargs)
clearmap_io.create(filename, shape=shape, dtype=dtype, order=order)
return filename
[docs]
def source(self, *args, **kwargs):
f_name = self.file_name(*args, **kwargs)
return clearmap_io.as_source(f_name)
[docs]
def read(self, *args, **kwargs):
return clearmap_io.read(self.file_name(*args, **kwargs))
# return array_processing.read(self.file_name(*args, **kwargs))
[docs]
def plot(self, f_type, **kwargs):
return q_plot_3d.plot(self.file_name(f_type, **kwargs))
[docs]
def write(self, *args, **kwargs):
if 'data' in kwargs.keys():
data = kwargs.pop('data')
else:
data = args[-1]
args = args[:-1]
return clearmap_io.write(self.filename(*args, **kwargs), data)
# return array_processing.write(self.filename(*args, **kwargs), data)
def __format_pattern_line(self, files, expression, tag_names, tile_axes_):
tile_positions = [expression.values(f) for f in files]
tile_positions = [tuple(tv[n] for n in tile_axes_) for tv in tile_positions]
tile_lower = tuple(np.min(tile_positions, axis=0))
tile_upper = tuple(np.max(tile_positions, axis=0))
tag_names = tuple(tag_names)
relative_file_pattern = os.path.relpath(expression.string(), start=self.directory)
pattern_line = f'{relative_file_pattern} {{{len(files)} files, {tag_names}: {tile_lower} -> {tile_upper}}}\n'
return pattern_line
def __str__(self):
s = "Workspace" # self.__class__.__name__
if self.wstype is not None:
s += f'[{self.wstype}]'
if self.prefix is not None:
s += f'({self.prefix})'
if self.directory is not None:
s += f'{{{self.directory}}}'
if self.debug is not None:
s += f'[{self.debug}]'
return s
def __repr__(self):
return self.__str__()
[docs]
def info(self, tile_axes=None, check_extensions=True, check_directory=True): # REFACTOR:
out = f'{self}\n'
out += 'directories:\n'
len_dirtype = np.max([len(dirtype) for dirtype in self.directory_type_to_name])
for dirtype in self.directory_type_to_name:
dirname = self.directory_name(dirtype=dirtype)
out += f' [{dirtype : >{len_dirtype}}]: {dirname}\n'
out += '\nfiles:\n'
len_f_type = np.max([len(f_type) for f_type in self._file_type_to_name])
header = f' [{{:{len_dirtype}}}] {{:{len_f_type}}}'
for f_type, f_names in self._file_type_to_name.items():
dirname, dirtype = self.directory_name(f_type=f_type, return_dirtype=True)
len_dirname = len(dirname) + 1 if dirname is not None else 0 # FIXME: unused
if Expression(f_names).tags:
if check_extensions:
extensions = list(np.unique([os.path.splitext(f)[-1] for f in
self.file_list(f_type, extension='*')]))
else:
extensions = [self.extension(f_type)]
if not extensions:
out += f'{header.format(dirtype, f_type)}: no file\n'
else:
for i, extension in enumerate(extensions):
expression = Expression(self.file_name(f_type, extension=extension))
tag_names = expression.tag_names()
tile_axes_ = tile_axes if tile_axes is not None else tag_names
for n in tile_axes_:
if n not in tag_names:
raise ValueError(f'The expression does not have the named pattern {n}')
for n in tag_names:
if n not in tile_axes_:
raise ValueError(f'The expression has the named pattern {n} '
f'that is not in tile_axes={tile_axes_}')
# construct tiling
files = clearmap_io.file_list(expression)
if files:
if i == 0:
out += f'{header.format(dirtype, f_type)}: '
else:
out += f'{header.format("", "")} '
pattern_line = self.__format_pattern_line(files, expression, tag_names, tile_axes_)
out += pattern_line
else:
f_name = self.file_name(f_type)
if check_directory and clearmap_io.is_directory(f_name):
out += f'{header.format(dirtype, f_type)}: directory\n'
else:
files = []
if clearmap_io.is_file(f_name):
files += [f_name]
f_name = self.file_name(f_type, postfix='*')
files += clearmap_io.file_list(f_name)
if files:
files = [os.path.relpath(f, start=self.directory) for f in files]
out += f'{header.format(dirtype, f_type)}: {files[0]}\n'
for f in files[1:]:
out += f'{header.format("", "")} {f}\n'
else:
out += f'{header.format(dirtype, f_type)}: no file\n'
print(out) # TODO: add print option or return s