Source code for ClearMap.ImageProcessing.Binary.Smoothing

# -*- coding: utf-8 -*-
"""
Smoothing
=========

Smooth a binary image based on the local configuration of voxels in a cube.

See also
--------
The algorithm has similarities to the skeletonization algorithm using
parallel thinning (:mod:`~ClearMap.ImageProcessing.Skeletonization`).
"""
__author__ = 'Christoph Kirst <christoph.kirst.ck@gmail.com>'
__license__ = 'GPLv3 - GNU General Public License v3 (see LICENSE.txt)'
__copyright__ = 'Copyright © 2020 by Christoph Kirst'
__webpage__ = 'https://idisco.info'
__download__ = 'https://www.github.com/ChristophKirst/ClearMap2'

import os
import functools

import numpy as np
import scipy.ndimage as ndi

import multiprocessing as mp

import ClearMap.IO.IO as io
import ClearMap.IO.FileUtils as fu

import ClearMap.ImageProcessing.Topology.Topology3d as t3d

import ClearMap.ParallelProcessing.BlockProcessing as bp
import ClearMap.ParallelProcessing.DataProcessing.ArrayProcessing as ap

import ClearMap.Utils.Timer as tmr

###############################################################################
# Smoothing by number of neighbours
###############################################################################


[docs] def smooth_by_counting(source, sink = None, low = 5, high = 10, shape = None): """Smooth binary image by counting neighbours. Arguments --------- source : array The binary source to smooth. sink : array or None. The sink to write the smoothed source to. low : int If a voxel has less then this number of 26-neighbours it is set to False. high : int If a voxel has more then this number of 26-neighbours it is set to True. shape : tuple of int or None The shape of the square structuring element to consider. Returns ------- sink : array The smoothed sbinary source. Note ---- The algorithm uses a sequence of 1d convoluions for speed, allowing only rectangular like structuring elements. """ ndim = source.ndim if shape is None: shape = (3,) * ndim filtered = source for d in range(ndim): weights = np.ones(shape[d], dtype = int) temp = np.zeros(source.shape, dtype = 'uint8') ap.correlate1d(filtered, weights, sink=temp, axis=d, mode='constant', cval=0) filtered = temp if sink is None: sink = np.array(source, dtype = bool) sink[filtered >= high] = True sink[filtered < low] = False return sink
############################################################################### ### Topological smoothing ###############################################################################
[docs] def rotations_faces(cube): U = cube.copy() N = t3d.rotate(cube, axis=0, steps=1) W = t3d.rotate(cube, axis=1, steps=1) UNW = [U,N,W] DSE = [t3d.reflect(X) for X in UNW] return UNW + DSE
[docs] def rotations_edges(cube): UN = cube.copy() UE = t3d.rotate(cube, axis=2, steps=1) US = t3d.rotate(cube, axis=2, steps=2) UW = t3d.rotate(cube, axis=2, steps=3) NW = t3d.rotate(cube, axis=1, steps=3) NE = t3d.rotate(cube, axis=1, steps=1) R = [t3d.reflect(X) for X in [UN,UE,US,UW,NW,NE]] return [UN,UE,US,UW,NW,NE] + R
[docs] def rotations_nodes(cube): UNW = cube.copy() UNE = t3d.rotate(cube, axis=2,steps=1) USE = t3d.rotate(cube, axis=2,steps=2) USW = t3d.rotate(cube, axis=2,steps=3) R = [t3d.reflect(X) for X in [UNW,UNE,USE,USW]] return [UNW,UNE, USE,USW] + R
[docs] def rotations_node_faces(cube): U_UNW = cube.copy() #U1 U_UNE = t3d.rotate(cube, axis=2, steps=1) #U3 U_USE = t3d.rotate(cube, axis=2, steps=2) #U5 U_USW = t3d.rotate(cube, axis=2, steps=3) #U7 Us = [U_UNW, U_UNE, U_USE, U_USW] Ns = [t3d.rotate(X, axis=0, steps=1) for X in Us] # N7. N1, N3, N5 Ws = [t3d.rotate(X, axis=1, steps=1) for X in Us] # W3, W1, W7, W5 UNWs = Us + Ns + Ws DSEs = [t3d.reflect(X) for X in UNWs] return UNWs + DSEs
[docs] def U0(cube): return (cube[1,1,1] & cube[1,1,2] & (not cube[0,0,0]) & (not cube[1,0,0]) & (not cube[2,0,0]) & (not cube[0,1,0]) & (not cube[1,1,0]) & (not cube[2,1,0]) & (not cube[0,2,0]) & (not cube[1,2,0]) & (not cube[2,2,0]) & (not cube[0,0,1]) & (not cube[1,0,1]) & (not cube[2,0,1]) & (not cube[0,1,1]) & (not cube[2,1,1]) & (not cube[0,2,1]) & (not cube[1,2,1]) & (not cube[2,2,1]))
[docs] def U1(cube): return (cube[1,1,1] & cube[1,1,2] & (not cube[0,0,0]) & (not cube[1,0,0]) & (not cube[2,0,0]) & (not cube[0,1,0]) & (not cube[1,1,0]) & (not cube[2,1,0]) & (not cube[0,2,0]) & (not cube[1,2,0]) & (not cube[2,2,0]) & (not cube[0,0,1]) & (not cube[1,0,1]) & (not cube[2,0,1]) & (not cube[0,1,1]) & (not cube[2,1,1]) & (cube[0,2,1]) & (not cube[1,2,1]) & (not cube[2,2,1]) & (cube[0,2,2]))
[docs] def U2(cube): return (cube[1,1,1] & cube[1,1,2] & (not cube[0,0,0]) & (not cube[1,0,0]) & (not cube[2,0,0]) & (not cube[0,1,0]) & (not cube[1,1,0]) & (not cube[2,1,0]) & (not cube[0,2,0]) & (not cube[1,2,0]) & (not cube[2,2,0]) & (not cube[0,0,1]) & (not cube[1,0,1]) & (not cube[2,0,1]) & (not cube[0,1,1]) & (not cube[2,1,1]) & (not cube[0,2,1]) & (cube[1,2,1]) & (not cube[2,2,1]) & (cube[1,2,2]))
[docs] def R2(cube): return (cube[1,1,1] & cube[1,2,2] & (not cube[0,0,0]) & (not cube[1,0,0]) & (not cube[2,0,0]) & (not cube[0,1,0]) & (not cube[1,1,0]) & (not cube[2,1,0]) & (not cube[0,2,0]) & (not cube[1,2,0]) & (not cube[2,2,0]) & (not cube[0,0,1]) & (not cube[1,0,1]) & (not cube[2,0,1]) & (not cube[0,1,1]) & (not cube[2,1,1]) & (not cube[0,0,2]) & (not cube[1,0,2]) & (not cube[2,0,2]))
#def S3_old(cube): # return (cube[1,1,1] & cube[0,2,2] & # (not cube[0,0,0]) & (not cube[1,0,0]) & (not cube[2,0,0]) & # (not cube[0,1,0]) & (not cube[1,1,0]) & (not cube[2,1,0]) & # (not cube[0,2,0]) & (not cube[1,2,0]) & (not cube[2,2,0]) & # (not cube[0,0,1]) & (not cube[1,0,1]) & (not cube[2,0,1]) & # (not cube[2,1,1]) & # (not cube[2,1,2]) & # (not cube[2,2,0]) & (not cube[2,2,1]) & (not cube[2,2,2])); #
[docs] def S3(cube): return (cube[1,1,1] & cube[0,2,2] & (not cube[0,0,0]) & (not cube[1,0,0]) & (not cube[2,0,0]) & (not cube[0,1,0]) & (not cube[1,1,0]) & (not cube[2,1,0]) & (not cube[0,2,0]) & (not cube[1,2,0]) & (not cube[2,2,0]) & (not cube[0,0,1]) & (not cube[1,0,1]) & (not cube[2,0,1]) & (not cube[2,1,1]) & (not cube[2,2,1]) & (not cube[0,0,2]) & (not cube[1,0,2]) & (not cube[2,0,2]) & (not cube[2,1,2]) & (not cube[2,2,2]))
[docs] def cube_to_smoothing(cube): """Match cube configurations to delete, add or keep a voxel.""" ### Delete center voxel: if cube[1,1,1]: # isolated or end if np.sum(cube) <= 2: return False # isolated voxels or voxels 'sticking out' for cr in rotations_faces(cube): if U0(cr): return False for cr in rotations_node_faces(cube): if U1(cr): return False if U2(cr): return False # voxels on edges for r in rotations_edges(cube): if R2(r): return False # voxels on nodes for r in rotations_nodes(cube): if S3(r): return False ### Add voxels if (not cube[1,1,1]): # mostly surrounded if np.sum(cube) >= 27-1-6: return True # 3 direct neighbours, 2 in line if ((cube[1,1,0] & cube[1,1,2]) or (cube[1,0,1] & cube[1,2,1]) or (cube[0,1,1] & cube[2,1,1])) and ( np.sum(cube[np.where(t3d.n6)]) >= 3): return True not_cube = np.logical_not(cube) # isolated voxels or voxels 'sticking out' for cr in rotations_faces(not_cube): if U0(cr): return True for cr in rotations_node_faces(not_cube): if U1(cr): return True if U2(cr): return True # voxels on edges for r in rotations_edges(not_cube): if R2(r): return True # voxels on nodes for r in rotations_nodes(not_cube): if S3(r): return True ### No change if cube[1,1,1]: return True else: return False
[docs] def index_to_smoothing(index, verbose = True): """Match index of configuration to smoothing action""" if verbose and index % 2**14 == 0: print('Smoothing LUT: %d / %d' % (index, 2**27)) cube = t3d.cube_from_index(index=index, center=None) return cube_to_smoothing(cube)
[docs] def generate_lookup_table(function = index_to_smoothing, verbose = True, processes = None): """Generates lookup table for templates""" if verbose: print('Smoothing: Generating look-up table!') if processes is None: processes = mp.cpu_count() if processes == 'serial': lut = [function(i) for i in range(2**27)] else: #import concurrent.futures as cf #with cf.ProcessPoolExecutor(max_workers=processes) as executor: # lut = executor.map(function, range(2**27)); pool = mp.Pool(mp.cpu_count()) lut = pool.map(function, range(2**27), chunksize=2**27//8//mp.cpu_count()) return np.array(lut, dtype = bool)
smooth_by_configuration_filename = "Smoothing.npy" """Filename for the look up table mapping a cube configuration to the smoothing action for the center pixel."""
[docs] def initialize_lookup_table(function = index_to_smoothing, filename = smooth_by_configuration_filename, verbose = True, processes = None): """Initialize the lookup table""" filename = os.path.join(os.path.dirname(os.path.abspath(__file__)), filename) #uncompress if only zip file exists. fu.uncompress(filename) #load lookup table if os.path.exists(filename): if verbose: print('Smoothing: Loading look-up table from %s!' % filename) return np.load(filename) else: if verbose: print('Smoothing: Look-up table does not exists! Pre-calculating it!') lut = generate_lookup_table(function = function, verbose=verbose, processes=processes) np.save(filename, lut) return lut
[docs] def smooth_by_configuration_block(source, iterations = 1, verbose = False): """Smooth a binary source using the local configuration around each pixel. Arguments --------- source : array The binary source to smooth. iterations : int Number of smoothing iterations. verbose : bool If True, print progress information. Returns ------- smoothed : array The smoothed binary array. """ try: if isinstance(source, io.src.Source): smoothed = source.array else: smoothed = source smoothed = np.asarray(smoothed, dtype='uint32') ndim = smoothed.ndim lut = np.asarray(initialize_lookup_table(verbose=verbose), dtype='uint32') for i in range(iterations): # index for axis in range(ndim): kernel = t3d.index_kernel(axis=axis) smoothed = ndi.correlate1d(smoothed, kernel, axis=axis, output='uint32', mode='constant', cval=0) smoothed = lut[smoothed] if verbose: print(f'Binary Smoothing: itertion {i+1} / {iterations} done!', flush=True) except Exception as err: print(err, flush=True) raise return np.asarray(smoothed, dtype=bool)
[docs] def smooth_by_configuration(source, sink = None, iterations = 1, processing_parameter = None, processes = None, verbose = False): """Smooth a binary source using the local configuration around each pixel. Arguments --------- source : array or Source The binary source to smooth. sink : array, Source or None The sink to write result of smoothing. If None, return array. iterations : int Number of smoothing iterations. processing_parameter : None or dict The parameter passed to :func:`ClearMap.ParallelProcessing.BlockProcessing.process`. processes : int or None number of processes to use. verbose : bool If True, print progress information. Returns ------- smoothed : array or Source Thre smoothed binary array. Note ---- The algorithm is based on a topological smoothing operation defined by adding or removing forground pixels based on the local topology of the binary array. """ if verbose: print('Binary smoothing: initialized!') timer = tmr.Timer() #smoothing function smooth = functools.partial(smooth_by_configuration_block, iterations=iterations, verbose=False) smooth.__name__ = 'smooth_by_configuration' #initialize sources and sinks source = io.as_source(source) sink = io.initialize(sink, shape=source.shape, dtype=bool, order=source.order) #block processing parameter block_processing_parameter = dict(axes = bp.block_axes(source), as_memory=True, overlap=None, function_type='source', processes=processes, verbose=verbose) if processing_parameter is not None: block_processing_parameter.update(processing_parameter) if not 'overlap' in block_processing_parameter or block_processing_parameter['overlap'] is None: # FIXME: use .get block_processing_parameter['overlap'] = 2 + 2 * iterations if not 'size_min' in block_processing_parameter or block_processing_parameter['size_min'] is None: block_processing_parameter['size_min'] = 2 + 2 * iterations + 1 if not 'axes' in block_processing_parameter or block_processing_parameter['axes'] is None: block_processing_parameter['axes'] = bp.block_axes(source) #print(block_processing_parameter) #block process bp.process(smooth, source, sink, **block_processing_parameter) if verbose: timer.print_elapsed_time('Binary smoothing: done') return sink
############################################################################### ### Testing ############################################################################### def _test(): import numpy as np import ClearMap.ImageProcessing.Binary.Smoothing as sm #reload(sm) lut = sm.initialize_lookup_table() #analysis:ignore #lut = sm.generate_lookup_table(verbose=True); shape = (30,40,50) binary = np.zeros(shape, dtype = bool, order='F') grid = np.meshgrid(*[range(s) for s in shape], indexing='ij') center = tuple(s/2 for s in shape) distance = np.sum([(g-c)**2 for g,c in zip(grid, center)], axis=0) binary[distance <= 10**2] = True import scipy.ndimage as ndi border = ndi.convolve(np.asarray(binary, dtype=int), np.ones((3,3,3))) border = np.logical_and(border > 0, border < 27) noisy = binary.copy() noisy[np.logical_and(border, np.random.rand(*shape) > 0.925)] = True smoothed = sm.smooth_by_configuration(noisy, iterations=3, verbose=True, processes='serial') import ClearMap.Visualization.Plot3d as p3d p3d.plot([noisy, smoothed]) #import scipy.io as sio #sio.savemat('binary_noisy.mat', {'noisy' : noisy}) #sio.savemat('binary_smoothed.mat', {'smoothed' : smoothed.array}) #noisy = sio.loadmat('binary_noisy.mat')['noisy']