Source code for ClearMap.Alignment.Elastix

# -*- coding: utf-8 -*-
"""
Elastix
=======

Interface to Elastix for alignment of volumetric data.

The elastix documentation can be found `here <https://elastix.lumc.nl/>`_.

In essence, a transformation :math:`T(x)` is sought so that for a fixed image 
:math:`F(x)` and a moving image :math:`M(x)`:

.. math::
    F(x) = M(T(x))

Once the map :math:`T` is estimated via elastix, transformix maps an image
:math:`I(x)` from the moving image frame to the fixed image frame, i.e.:

.. math::
    I(x) \\rightarrow I(T(x)) 

To register an image onto a reference image, the fixed image is typically 
chosen to be the image to be registered, while the moving image is the
reference image. In this way a position :math:`x` in the data is mapped to the corresponding
reference position :math:`T(x)` via transformix as:

.. math::
    x \\rightarrow T(x)

Summary
-------
    * elastix finds a transformation :math:`T: \\mathrm{fixed image} \\rightarrow \\mathrm{moving image}`
    * the fixed image is typically the image to be registered
    * the moving image is typically the reference image
    * the result folder may contain an image (mhd file) that is :math:`T^{-1}(\\mathrm{moving})`,
      i.e. has the size of the fixed image
    * transformix applied to data gives :math:`T^{-1}(\\mathrm{data})` !
    * transformix applied to points gives :math:`T(\\mathrm{points})` !
    * point arrays are assumed to be in (x,y,z) coordinates consistent with (x,y,z) array representation
      of images in ClearMap

In terms of atlas alignment this reads as

Atlas alignment
---------------
    * elastix finds a transformation :math:`T: \\mathrm{data} \\rightarrow \\mathrm{atlas}`
    * the result folder may contain an image (mhd file) that is :math:`T^{-1}(\\mathrm{atlas})`,
      i.e. is the atlas transformed to the data.
    * transformix applied to the atlas gives :math:`T^{-1}(\\mathrm{atlas})`, i.e. the atlas mapped onto the data !
    * transformix applied to points gives :math:`T(\\mathrm{points})`, i.e. the points in the atlas reference frame !

Main routines are: :func:`align`, :func:`transform`.
    
See Also
--------
    `Elastix documentation <https://elastix.lumc.nl/doxygen/index.html>`_
    :mod:`~ClearMap.Alignment.Resampling`
"""
__author__ = 'Christoph Kirst <christoph.kirst.ck@gmail.com>'
__license__ = 'GPLv3 - GNU General Public License v3 (see LICENSE)'
__copyright__ = 'Copyright © 2020 by Christoph Kirst'
__webpage__ = 'https://idisco.info'
__download__ = 'https://www.github.com/ChristophKirst/ClearMap2'


import os
import sys
import subprocess
import platform
import tempfile
import shutil
import re
from io import UnsupportedOperation

import numpy as np

import multiprocessing as mp

from ClearMap.Utils.exceptions import ClearMapException

import ClearMap.IO.IO as io
import ClearMap.Settings as settings

from .Transformations.Transformation import TransformationBase

##############################################################################
# Initialization and Settings
##############################################################################

elastix_binary = None
"""The elastix executable.

Notes
-----
  - setup in :func:`initialize_elastix`
"""

elastix_lib = None
"""Path to the elastix library.

Notes
-----
  - setup in :func:`initialize_elastix`
"""

transformix_binary = None
"""The transformix executable.

Notes
-----
  - setup in :func:`initialize_elastix`
"""

initialized = False
"""Status of the elastix binaries and paths.

Notes
-----
  - setup in :func:`initialize_elastix`
"""






[docs] def set_elastix_library_path(elastix_lib_path=None): """Add elastix library path to the LD_LIBRARY_PATH variable in linux. Arguments --------- elastix_lib_path : str or None Path to elastix root directory. If None :const:`ClearMap.Settings.elastix_path` is used. """ os_name = platform.system().lower() if os_name.startswith('linux'): lib_var_name = 'LD_LIBRARY_PATH' elif os_name.startswith('darwin'): lib_var_name = 'DYLD_LIBRARY_PATH' else: raise ValueError(f'Unknown OS {os_name}') print(f'OS: {os_name}, library variable name: {lib_var_name}') if lib_var_name in os.environ: lib_path = os.environ[lib_var_name] print(f'Variable {lib_var_name} exists, patching with {lib_path}') if elastix_lib_path not in lib_path.split(':'): os.environ[lib_var_name] = f'{elastix_lib_path}:{lib_path}' else: print(f'Variable {lib_var_name} not found, adding elastix lib folder: {elastix_lib}') os.environ[lib_var_name] = elastix_lib_path
[docs] def initialize_elastix(path=None): """Initialize all paths and binaries of elastix Arguments --------- path (str or None): path to elastix root directory, if None :const:`ClearMap.Settings.elastix_path` is used. See also -------- :const:`elastix_binary`, :const:`elastix_lib`, :const:`transformix_binary`, :const:`initialized`, :func:`set_elastix_library_path` """ global elastix_binary, elastix_lib, transformix_binary, initialized if path is None and settings.elastix_path is not None: path = settings.elastix_path else: raise RuntimeError('Cannot find elastix path!') elastix_binary = search_elx_bin(path, 'elastix') transformix_binary = search_elx_bin(path, 'transformix') elastix_lib = search_elx_lib(path) set_elastix_library_path(elastix_lib) # FIXME: check if needs global elastix_lib ?? initialized = True print(f'Elastix successfully initialized from path: {path}') return path
[docs] def search_elx_lib(path): for sub_dir in ('lib', 'bin'): lib_path = os.path.join(path, sub_dir) if os.path.exists(lib_path): return lib_path else: raise ClearMapException(f'Cannot find elastix libs in {lib_path} set path in Settings.py accordingly!')
[docs] def search_elx_bin(path, bin_type): elx = os.path.join(path, 'bin', bin_type) if os.path.exists(elx): return elx else: raise RuntimeError(f'Cannot find elastix binary {elx}, set path in Settings.py accordingly!')
[docs] def get_elastix_version(): """Returns the elastix version. Returns ------- version : str The elastix version. """ check_elastix_initialized() cmd = [elastix_binary, '--version'] res = subprocess.check_output(cmd) res = res.decode('utf-8') version = float(res.splitlines()[0].split(':')[-1].strip()) return version
if 'sphinx' in sys.modules.keys(): print('Import for documentation: elastix not initialized!') else: initialize_elastix()
[docs] def check_elastix_initialized(): """Checks if elastix is initialized Returns ------- initialized : bool True if elastix paths are set. """ if not initialized: raise RuntimeError("Elastix not initialized: run initialize_elastix(path) with proper path to elastix first") return True
############################################################################### # Basic interface routines ###############################################################################
[docs] def transform_file(result_directory): """Finds and returns the transformation parameter file. Arguments --------- result_directory : str Path to directory of elastix results. Returns ------- filename : str File name of the first transformation parameter file. Notes ----- In case of multiple transformation parameter files the top level file is returned. """ files = os.listdir(result_directory) files = [x for x in files if re.match('TransformParameters.\d.txt', x)] files.sort() if not files: raise RuntimeError(f'Cannot find a valid transformation file in {result_directory}!') return os.path.join(result_directory, files[-1])
[docs] def transform_directory_and_file(transform_parameter_file=None, transform_directory=None): """Determines transformation directory and file from either. Arguments --------- transform_parameter_file : str or None File name of the transformation parameter file. transform_directory : str or None Directory to the transformation parameter. Returns ------- transform_parameter_file : str File name of the transformation parameter file. transform_directory : str Directory to the transformation parameter. Notes ----- Only one of the two arguments need to be specified. """ if not transform_parameter_file: if not transform_directory: raise ValueError('Neither the alignment directory nor the transformation parameter file is specified!') transform_parameter_dir = transform_directory transform_parameter_file = transform_file(transform_parameter_dir) else: transform_parameter_dir = os.path.split(transform_parameter_file) transform_parameter_dir = transform_parameter_dir[0] transform_parameter_file = transform_parameter_file return transform_parameter_dir, transform_parameter_file
[docs] def set_path_transform_files(result_directory): """Replaces relative with absolute path in the parameter files in the result directory. Arguments --------- result_directory : str Path to directory of elastix results. Notes ----- When elastix is not run in the directory of the transformation files the absolute path needs to be given in each transformation file to point to the subsequent transformation files. This is done via this routine. """ files = os.listdir(result_directory) files = [x for x in files if re.match('TransformParameters.\d.txt', x)] files.sort() if not files: raise RuntimeError(f'Cannot find a valid transformation file in {result_directory}!') rec = re.compile("\(InitialTransformParametersFileName \"(?P<parname>.*)\"\)") for f in files: fh, tmpfn = tempfile.mkstemp() ff = os.path.join(result_directory, f) with open(tmpfn, 'w') as newfile, open(ff) as parfile: for line in parfile: # print line m = rec.match(line) if m is not None: pn = m.group('parname') if pn != 'NoInitialTransform': pathn, filen = os.path.split(pn) filen = os.path.join(result_directory, filen) newfile.write(line.replace(pn, filen)) else: newfile.write(line) else: newfile.write(line) os.close(fh) os.remove(ff) shutil.move(tmpfn, ff)
[docs] def set_metric_parameter_file(parameter_file, metric): """Replaces the metric in the parameter file. Arguments --------- parameter_file : str The parameter file name. metric : str The metric to use. Notes ----- Used to replace the metric when inverse transform is estimated. """ fh, tmpfn = tempfile.mkstemp() rec = re.compile("\(Metric \"(?P<parname>.*)\"\)") mset = False with open(tmpfn, 'w') as newfile, open(parameter_file) as parfile: for line in parfile: # print line m = rec.match(line) if m is not None: pn = m.group('parname') newfile.write(line.replace(pn, metric)) mset = True else: newfile.write(line) if not mset: newfile.write(f'(Metric "{metric}")\n') os.close(fh) # FIXME: try finally os.remove(parameter_file) shutil.move(tmpfn, parameter_file)
[docs] def result_data_file(result_directory): """Returns the mhd result file in a result directory. Arguments --------- result_directory : str Path to elastix result directory. Returns ------- result_file : str The mhd file in the result directory. """ files = os.listdir(result_directory) files = [x for x in files if re.match('.*.mhd', x)] files.sort() if not files: raise RuntimeError('Cannot find a valid result data file in ' + result_directory) return os.path.join(result_directory, files[0])
[docs] def transform_file_size_and_spacing(transform_file): """Parse the image size and spacing from a transformation parameter file. Arguments --------- transform_file : str File name of the transformix parameter file. Returns ------- size: tuple The image size. spacing : tuple The image spacing. """ resi = re.compile("\(Size (?P<size>.*)\)") resp = re.compile("\(Spacing (?P<spacing>.*)\)") si = None sp = None with open(transform_file) as parfile: for line in parfile: m = resi.match(line) if m is not None: pn = m.group('size') si = pn.split() m = resp.match(line) if m is not None: pn = m.group('spacing') sp = pn.split() parfile.close() si = tuple(float(x) for x in si) sp = tuple(float(x) for x in sp) return si, sp
[docs] def set_transform_file_size_and_spacing(transform_file, size, spacing): """Replaces size and scale in the transformation parameter file Arguments --------- transform_file : str Transformation parameter file. size : tuple The new image size. spacing : tuple The new image spacing. """ resi = re.compile("\(Size (?P<size>.*)\)") resp = re.compile("\(Spacing (?P<spacing>.*)\)") fh, tmpfn = tempfile.mkstemp() si = tuple(int(x) for x in size) with open(transform_file) as parfile, open(tmpfn, 'w') as newfile: for line in parfile: m = resi.match(line) if m is not None: newfile.write("(Size %d %d %d)" % si) else: m = resp.match(line) if m is not None: newfile.write("(Spacing %d %d %d)" % spacing) else: newfile.write(line) os.remove(transform_file) shutil.move(tmpfn, transform_file)
[docs] def rescale_size_and_spacing(size, spacing, scale): """Rescales the size and spacing Arguments --------- size : tuple The image size. spacing : tuple The image spacing. scale : tuple, number The scale factor. Returns ------- size : tuple The scaled image size. spacing : tuple The scaled image spacing. """ if not isinstance(scale, (tuple, list)): scale = (scale,) * len(size) si = tuple(int(x * s) for x, s in zip(size, scale)) sp = tuple(sp / sc for sp, sc in zip(spacing, scale)) return si, sp
############################################################################## # Elastix Runs ##############################################################################
[docs] def align(fixed_image, moving_image, affine_parameter_file, bspline_parameter_file=None, result_directory=None, processes=None, workspace=None, moving_landmarks_path=None, fixed_landmarks_path=None): """ Align images using elastix, estimates a transformation :math:`T:` fixed image :math:`\\rightarrow` moving image. Arguments --------- fixed_image : str Image source of the fixed image (typically the reference image). moving_image : str Image source of the moving image (typically the image to be registered). affine_parameter_file : str or None Elastix parameter file for the primary affine transformation. bspline_parameter_file : str or None Elastix parameter file for the secondary non-linear transformation. result_directory : str or None Elastic result directory. processes : int or None Number of threads to use. workspace : Workspace The workspace passed to the function to cath the executor to cancel the execution of the computation moving_landmarks_path : str Path to the moving landmarks file. fixed_landmarks_path : str Path to the fixed landmarks file. Returns ------- result_directory : str Path to elastix result directory. """ processes = processes if processes is not None else mp.cpu_count() check_elastix_initialized() # result directory result_directory = result_directory if result_directory is not None else tempfile.gettempdir() if not os.path.exists(result_directory): os.mkdir(result_directory) check_spaces(affine_parameter_file, bspline_parameter_file, fixed_image, moving_image, moving_landmarks_path, fixed_landmarks_path, result_directory) # run elastix cmd = [elastix_binary, '-threads', str(processes), '-m', f'{moving_image}', '-f', f'{fixed_image}'] if affine_parameter_file is not None: cmd.extend(['-p', f'{affine_parameter_file}']) if bspline_parameter_file is not None: cmd.extend(['-p', f'{bspline_parameter_file}']) if moving_landmarks_path is not None or fixed_landmarks_path is not None: cmd.extend(['-mp', f'{moving_landmarks_path}', '-fp', f'{fixed_landmarks_path}']) cmd.extend(['-out', f'{result_directory}']) try: with subprocess.Popen(cmd, stdout=sys.stdout, stderr=sys.stdout) as proc: # FIXME: check if we need an "if not sys.stdout.fileno" if workspace is not None: workspace.process = proc except UnsupportedOperation: try: with subprocess.Popen(cmd) as proc: if workspace is not None: workspace.process = proc except (subprocess.SubprocessError, OSError) as err: raise ClearMapException(f'Align: failed executing: {" ".join(cmd)}') from err except (subprocess.SubprocessError, OSError) as err: raise ClearMapException(f'Align: failed executing: {" ".join(cmd)}') from err finally: if workspace is not None: workspace.process = None return result_directory
[docs] def check_spaces(*paths): for p in paths: if p is not None and ' ' in p: raise ValueError(f'Could not run elastix with path containing spaces: {p}')
[docs] def transform(source, sink='transformix', transform_parameter_file=None, transform_directory=None, result_directory=None): """Transform a raw data set to reference using the elastix alignment results. Arguments --------- source : str or array Image source to be transformed. sink : str or None Image sink to save transformed image to. If 'transformix' return the default name of the data file generated by transformix. transform_parameter_file : str or None Parameter file for the primary transformation. If None, the file is determined from the transform_directory. transform_directory : str or None Result directory of elastix alignment. If None the transform_parameter_file has to be given. result_directory : str or None The directory for the transformix results. Returns ------- transformed : array or st Array or file name of the transformed data. Note ---- If the map determined by elastix is :math:`T: \\mathrm{fixed} \\rightarrow \\mathrm{moving}`, transformix on data works as :math:`T^{-1}(\\mathrm{data})`. """ check_elastix_initialized() # image source = io.as_source(source) if isinstance(source, io.tif.Source): img_name = source.location delete_image = None else: img_name = os.path.join(tempfile.gettempdir(), 'elastix_input.tif') io.write(img_name, source) delete_image = img_name # result directory delete_result_directory = None if result_directory is None: result_dirname = os.path.join(tempfile.gettempdir(), 'elastix_output') delete_result_directory = result_dirname else: result_dirname = result_directory if not os.path.exists(result_dirname): os.makedirs(result_dirname) # transformation parameter transform_parameter_dir, transform_parameter_file = transform_directory_and_file( transform_parameter_file=transform_parameter_file, transform_directory=transform_directory) set_path_transform_files(transform_parameter_dir) # transformix -in inputImage.ext -out outputDirectory -tp TransformParameters.txx check_spaces(img_name, result_dirname, transform_parameter_file) cmd = f'{transformix_binary} -in {img_name} -out {result_dirname} -tp {transform_parameter_file}' res = os.system(cmd) if res != 0: raise RuntimeError('transform_data: failed executing: ' + cmd) # read data and clean up if delete_image is not None: os.remove(delete_image) if sink == 'transformix': return result_data_file(result_dirname) elif sink is None: result_file = result_data_file(result_dirname) result = io.read(result_file) elif isinstance(sink, str): result_file = result_data_file(result_dirname) result = io.convert(result_file, sink) else: raise RuntimeError('transform_data: sink not valid!') if delete_result_directory is not None: shutil.rmtree(delete_result_directory) return result
[docs] def deformation_field(sink='transformix', transform_parameter_file=None, transform_directory=None, result_directory=None): """Create the deformation field T(x) - x. Arguments --------- sink : str or None Image sink to save the transformation field; if 'transformix' return the default name of the data file generated by transformix. transform_parameter_file : str or None Parameter file for the primary transformation, if None, the file is determined from the transform_directory. transform_directory : str or None Result directory of elastix alignment, if None the transform_parameter_file has to be given. result_directory : str or None The directory for the transformix results. Returns ------- deformation_field : array or str Array or file name of the deformation field data. Note ---- The map determined by elastix is :math:`T \\mathrm{fixed} \\rightarrow \\mathrm{moving}`. """ check_elastix_initialized() # result directory delete_result_directory = None if result_directory is None: result_dirname = os.path.join(tempfile.gettempdir(), 'elastix_output') delete_result_directory = result_dirname else: result_dirname = result_directory if not os.path.exists(result_dirname): os.makedirs(result_dirname) # setup transformation transform_parameter_dir, transform_parameter_file = transform_directory_and_file( transform_parameter_file=transform_parameter_file, transform_directory=transform_directory) set_path_transform_files(transform_parameter_dir) # transformix -in inputImage.ext -out outputDirectory -tp TransformParameters.txt check_spaces(result_dirname, transform_parameter_file) cmd = f'{transformix_binary} -def all -out {result_dirname} -tp {transform_parameter_file}' res = os.system(cmd) if res != 0: raise RuntimeError(f'deformation_field: failed executing: {cmd}') # read result and clean up if sink == 'transformix': return result_data_file(result_dirname) elif sink is None: result_file = result_data_file(result_dirname) result = io.read(result_file) elif isinstance(sink, str): result_file = result_data_file(result_dirname) result = io.convert(result_file, sink) else: raise RuntimeError('deformation_field: sink not valid!') if delete_result_directory is not None: shutil.rmtree(delete_result_directory) return result
[docs] def deformation_distance(deformation_field, sink=None, scale=None): """Compute the distance field from a deformation vector field. Arguments --------- deformation_field : str or array Source of the deformation field determined by :func:`deformation_field`. sink : str or None Image sink to save the deformation field to. scale : tuple or None Scale factor for each dimension, if None = (1,1,1). Returns ------- deformation_distannce : array or st Array or file name of the deformation distance data. """ deformation_field = io.read(deformation_field) df = np.square(deformation_field) if scale is not None: for i in range(3): df[:, :, :, i] = df[:, :, :, i] * (scale[i] * scale[i]) df = np.sqrt(np.sum(df, axis=3)) return io.write(sink, df)
############################################################################### # Point transformations ###############################################################################
[docs] def write_points(filename, points, indices=False, binary=True): """Write points as elastix/transformix point file Arguments --------- filename : str File name of the elastix point file. points : array or str Source of the points. indices : bool Write as pixel indices or physical coordinates. Returns ------- filename: str File name of the elastix point file. """ points = io.read(points) if binary: with open(filename, 'wb') as point_file: if get_elastix_version() < 5: dtype = np.int64 else: dtype = np.uint64 if indices: np.array(1, dtype=dtype).tofile(point_file) else: np.array(0, dtype=dtype).tofile(point_file) num_points = np.array(len(points), dtype=np.int64) num_points.tofile(point_file) points = np.asarray(points, dtype=np.double) points.tofile(point_file) else: with open(filename, 'w') as point_file: if indices: point_file.write('index\n') else: point_file.write('point\n') point_file.write(f'{points.shape[0]}\n') np.savetxt(point_file, points, delimiter=' ', newline='\n', fmt='%.5e') return filename
[docs] def read_points(filename, indices=False, binary=True): """Parses the output points from the output file of transformix Arguments --------- filename : str File name of the transformix output file. indices : bool If True return pixel indices otherwise float coordinates. Returns -------- points : array The point coordinates. """ if binary: with open(filename) as f: if get_elastix_version() < 5: dtype = np.int64 else: dtype = np.uint64 _ = np.fromfile(f, dtype=dtype, count=1)[0] # Discard the index flag num_points = np.fromfile(f, dtype=dtype, count=1)[0] if num_points == 0: points = np.zeros((0, 3)) else: points = np.fromfile(f, dtype=np.double) points = np.reshape(points, (num_points, 3)) else: # text file with open(filename) as f: lines = f.readlines() num_points = len(lines) if num_points == 0: points = np.zeros((0, 3)) else: points = np.zeros((num_points, 3)) for k, line in enumerate(lines): ls = line.split() if indices: for i in range(0, 3): points[k, i] = float(ls[i + 22]) else: for i in range(0, 3): points[k, i] = float(ls[i + 30]) return points
[docs] def transform_points(source, sink=None, transform_parameter_file=None, transform_directory=None, indices=False, result_directory=None, temp_file=None, binary=True): """Transform coordinates math:`x` via elastix estimated transformation to :math:`T(x)`. Arguments --------- source : str Source of the points. sink : str or None Sink for transformed points. transform_parameter_file : str or None Parameter file for the primary transformation. If None, the file is determined from the transform_directory. transform_directory : str or None Result directory of elastix alignment. If None the transform_parameter_file has to be given. indices : bool If True use points as pixel coordinates otherwise spatial coordinates. result_directory : str or None Elastic result directory. temp_file : str or None Optional file name for the elastix point file. Returns ------- points : array or st Array or file name of transformed points. Note ---- The transformation is from the fixed image coordinates to the moving image coordinates. """ check_elastix_initialized() # input point file if temp_file is None: extension = "bin" if binary else "txt" temp_file = os.path.join(tempfile.gettempdir(), f'elastix_input.{extension}') delete_point_file = None if isinstance(source, str): if len(source) > 3 and source[-3:] in ['txt', 'bin']: binary = source[-3:] == 'bin' point_file = source else: points = io.read(source) point_file = temp_file delete_point_file = temp_file write_points(point_file, points, indices=indices, binary=binary) elif isinstance(source, np.ndarray): point_file = temp_file delete_point_file = temp_file write_points(point_file, source, indices=indices, binary=binary) else: raise RuntimeError(f'transform_points: source not string or array but "{type(source)}"!') # result directory if result_directory is None: out_dirname = os.path.join(tempfile.gettempdir(), 'elastix_output') delete_result_directory = out_dirname else: out_dirname = result_directory delete_result_directory = None if not os.path.exists(out_dirname): os.makedirs(out_dirname) # transform transform_parameter_dir, transform_parameter_file = transform_directory_and_file( transform_parameter_file=transform_parameter_file, transform_directory=transform_directory) set_path_transform_files(transform_parameter_dir) # run transformix check_spaces(point_file, out_dirname, transform_parameter_file) cmd = f'{transformix_binary} -def {point_file} -out {out_dirname} -tp {transform_parameter_file}' print(cmd) res = os.system(cmd) if res != 0: raise RuntimeError(f'failed executing {cmd}, return code: {res}') # read data and clean up if delete_point_file is not None: os.remove(delete_point_file) # read data / file extension = 'bin' if binary else 'txt' output_file = f'outputpoints.{extension}' if sink == []: # return sink as file name return os.path.join(out_dirname, output_file) else: transformed_points = read_points(os.path.join(out_dirname, output_file), indices=indices, binary=binary) if delete_result_directory: shutil.rmtree(delete_result_directory) return io.write(sink, transformed_points)
[docs] def inverse_transform(fixed_image, affine_parameter_file, bspline_parameter_file=None, transform_parameter_file=None, transform_directory=None, result_directory=None, processes=None): """Estimate inverse transformation :math:`T^{-1}:` moving image :math:`\\rightarrow` fixed image. Arguments --------- fixed_image : str Image source of the fixed image (typically the reference image) affine_parameter_file : str The parameter file for the original affine transformation. bspline_parameter_file : str The parameter file for the original b-spline transformation. transform_directory : str or None Elastic result directory of the original transform. transform_parameter_file : str or None Elastic transformation parameter file of the original transform. result_directory : str or None Elastic result directory of the inverse transform. processes : int or None Number of processes to use. Returns ------- result_directory : str Path to elastix result directory. """ check_elastix_initialized() # result directory if result_directory is None: result_directory = tempfile.gettempdir() if not os.path.exists(result_directory): os.mkdir(result_directory) # transformation files transform_parameter_dir, transform_parameter_file = transform_directory_and_file( transform_parameter_file=transform_parameter_file, transform_directory=transform_directory) set_path_transform_files(transform_parameter_dir) # set metric of the parameter files if bspline_parameter_file is not None: _, bspline_file = os.path.split(bspline_parameter_file) bspline_file = os.path.join(result_directory, bspline_file) shutil.copyfile(bspline_parameter_file, bspline_file) set_metric_parameter_file(bspline_file, 'DisplacementMagnitudePenalty') else: bspline_file = None if affine_parameter_file is not None: _, affine_file = os.path.split(affine_parameter_file) affine_file = os.path.join(result_directory, affine_file) shutil.copyfile(affine_parameter_file, affine_file) set_metric_parameter_file(affine_file, 'DisplacementMagnitudePenalty') else: affine_file = None # run elastix check_spaces(fixed_image, result_directory, transform_parameter_file, affine_file, bspline_file) cmd = f'{elastix_binary} -threads {processes} -m {fixed_image} -f {fixed_image} -t0 {transform_parameter_file} ' # FIXME: fixed_image is used twice if affine_file is not None: cmd += f'-p {affine_file} ' if bspline_file is not None: cmd += f'-p {bspline_file} ' cmd += f'-out {result_directory}' res = os.system(cmd) if res != 0: raise RuntimeError(f'inverse_transform: failed executing: {cmd}') return result_directory
############################################################################### # Transformation interface ###############################################################################
[docs] class ElastixTransformation(TransformationBase): """Elastix Transformation Note ---- The transformation represented is :math:`T` as estimated by elastix. For data transformix applies :math:`T^{-1}` and for points :math:`T` """ ttype = 'Elastix' def __init__(self, transform_directory=None, transform_parameter_file=None, inverse=False): super().__init__(inverse=inverse) transform_directory, transform_parameter_file = transform_directory_and_file( transform_parameter_file=transform_parameter_file, transform_directory=transform_directory) self.transform_directory = transform_directory self.transform_parameter_file = transform_parameter_file
[docs] def transform_data(self, source, inverse=False, **kwargs): inverse = self.get_inverse(inverse) # Note: transformix maps atlas -> data via T^-1 if not inverse: raise NotImplementedError() else: return transform(source, transform_parameter_file=self.transform_parameter_file, transform_directory=self.transform_directory, **kwargs)
[docs] def transform_points(self, source, inverse=False, **kwargs): inverse = self.get_inverse(inverse) if not inverse: return transform_points(source, transform_parameter_file=self.transform_parameter_file, transform_directory=self.transform_directory, **kwargs) else: raise NotImplementedError()
[docs] def to_dict(self) -> dict: dictionary = super().to_dict() dictionary.update(transform_directory=self.transform_directory, transform_parameter_file=self.transform_parameter_file) return dictionary
def __repr__(self): return '%s[%s]' % (super().__repr__(), self.transform_directory)
############################################################################### # Tests ############################################################################### def _test(): import os import numpy as np import ClearMap.Settings as settings import ClearMap.Alignment.Elastix as elx # reload(elx) path = os.path.join(settings.test_data_path, 'Elastix') result_directory = os.path.join(path, 'elastix_template_to_ref') elx.transform_file(result_directory) shape = np.array([432, 512, 229]) points = np.random.rand(30, 3) * 0.25 * shape + 0.5 * shape points = np.round(points) points = np.array(points, dtype=int) test_txt = elx.transform_points(points, transform_directory=result_directory, binary=False, indices=False) test = elx.transform_points(points, transform_directory=result_directory, binary=True, indices=False) print(np.allclose(test_txt, test))