# -*- coding: utf-8 -*-

Module to process data in parallel for large data sets

This strategy allows memory intensive processing of larger data sets.


>>> import numpy as np
>>> import ClearMap.IO.IO as io
>>> import ClearMap.ParallelProcessing.BlockProcessing as bp
>>> source = io.as_source(np.asarray(np.random.rand(50,100,200), order = 'F'))
>>> blocks = bp.split_into_blocks(source, processes=10, axes=[2], size_min=30, size_max=50, overlap=20);
>>> blocks[0]
Block-Numpy-Source(50, 100, 38)[float64]|F|

>>> blocks[0].info()
'0/10<(0, 0, 0)/(1, 1, 10)> (50, 100, 38)@(50, 100, 200)[(:,:,0:38)]'

>>> b.valid
'Sliced-Block-Numpy-Source(50, 100, 28)[float64]|F|'

>>> b = blocks[0];
>>> print(b.valid.base_shape)
>>> print(b.valid.base_slicing)
>>> print(b.iteration)
(50, 100, 200)
(slice(None, None, None), slice(None, None, None), slice(None, 28, None))
>>> shape = (2,3,20);
>>> source = io.npy.Source(array = np.random.rand(*shape));
>>> sink = io.npy.Source(array = np.zeros(shape))
>>> def process_image(source, sink=None):
>>>    if sink is None:
>>>      sink = np.zeros(source.shape);
>>>    sink[:] = 100 * source[:];
>>>    return sink;
>>> bp.process(process_image, source, sink,
>>>            processes = 'serial', size_max = 4, size_min = 1, overlap = 0, axes = [2],
>>>            optimization = True, verbose = True);
>>> print(np.all(sink[:] == process_image(source)))
>>> bp.process(process_image, source, sink,
>>>            processes = None, size_max = 10, size_min = 6, overlap = 3, axes = 'all',
>>>            optimization = True, verbose = True);

import functools as ft
import multiprocessing as mp
import concurrent.futures as cf
import warnings

import numpy as np
import gc

import ClearMap.ParallelProcessing.Block as blk
import ClearMap.ParallelProcessing.ParallelTraceback as ptb

import ClearMap.IO.IO as io
import ClearMap.IO.SMA as sma

import ClearMap.Utils.Timer as tmr

### Default parameter
from ClearMap.Utils.utilities import CancelableProcessPoolExecutor

default_size_max = None
"""Default maximal size of a block.

Set this to limit the maximal block sizes automatically if size_max passed 
to :func:`ClearMap.ParallelProcessing.BlockProcessing.process` is None.

If this is None, the full source size will be used.

default_size_min = None
"""Default minimal size of a block.

Set this to limit the minimal block sizes automatically if size_min passed 
to :func:`ClearMap.ParallelProcessing.BlockProcessing.process` is None.

If this is None, the full source size will be used.

default_overlap = None
"""Default overlap between blocks.

This value is used if overlap passed 
to :func:`ClearMap.ParallelProcessing.BlockProcessing.process` is None.

If this is None, a zero overlap will be used.

### Processing

[docs] def process(function, source, sink = None, axes = None, size_max = None, size_min = None, size_multiple_of = None, overlap = None, optimization = True, optimization_fix = 'all', neighbours = False, function_type = None, as_memory = False, return_result = False, return_blocks = False, processes = None, verbose = False, workspace=None, **kwargs): """Create blocks and process a function on them in parallel. Arguments --------- function : function The main data processing script. source : str, Source, or list The source or list of sources to apply a function to sink : str, Source, list, or None The sink or list of sinks to write the result to. If None, return single array. axes : int, list of ints, or None Axes along which to split the source. If None, the splitting is determined automaticlly from the order of the array. size_max : int, list of ints or None Maximal size of a block along the axes. If None, :const:`default_size_max` is used. size_min : int or list of ints Minial size of a block along the axes. If None, :const:`default_size_min` is used. overlap : int, list of ints or None Minimal overlap between blocks along the axes. If None, :const:`default_overlap` is used. optimization : bool or list of bools If True, optimize block sizes to best fit number of processes. optimization_fix : 'increase', 'decrease', 'all' or None or list Increase, decrease or optimally change the block size when optimization is active. neighbours : bool If True, also include information about the neighbourhood in the blocks. function_type : 'array', 'source', 'block' or None The function type passed. If None, 'array' is used. * 'array' Reading and writing the valid slices from the blocks is automatic and the function gets passed numpy arrays. * 'source' Reading and writing the valid slices from the blocks is automatic and the function gets passed Source classes as inputs. * 'block' The function is assumed to act on and update blocks itself. as_memory : bool If True, load full blocks into memory before applying the function. Can be useful to reduce frequent reading and writing operations of memmaps. return_result : bool If True, return the results of the proceessing functions. return_blocks : bool If True, return the block information used to distribute the processing. processes : int, None The number of parallel processes, if 'serial', use serial processing. verbose : bool Print information on sub-stack generation. Returns ------- sink : str, Source, list or array The results of the processing. Note ---- This implementation only supports processing into sinks with the same shape as the source. """ #sources and sinks if isinstance(source, list): sources = source else: sources = [source] sources = [io.as_source(s).as_virtual() for s in sources] #if sink is None: # sink = sma.Source(shape=sources[0].shape, dtype=sources[0].dtype, order=sources[0].order); if isinstance(sink, list): sinks = sink elif sink is None: sinks = [] else: sinks = [sink] sinks = [io.initialize(s, hint=sources[0]) for s in sinks] sinks = [io.as_source(s).as_virtual() for s in sinks] axes = block_axes(sources[0], axes=axes) split = ft.partial(split_into_blocks, processes=processes, axes=axes, size_max=size_max, size_min=size_min, size_multiple_of=size_multiple_of, overlap=overlap, optimization=optimization, optimization_fix=optimization_fix, neighbours=neighbours, verbose=False) source_blocks = [split(s) for s in sources] sink_blocks = [split(s) for s in sinks] n_blocks = len(source_blocks[0]) source_blocks = [[blocks[i] for blocks in source_blocks] for i in range(n_blocks)] sink_blocks = [[blocks[i] for blocks in sink_blocks] for i in range(n_blocks)] if function_type is None: function_type = 'array' if function_type == 'block': func = ft.partial(process_block_block, function=function, as_memory=as_memory, return_result=return_result, verbose=verbose, **kwargs) elif function_type == 'source': func = ft.partial(process_block_source, function=function, as_memory=as_memory, as_array=False, verbose=verbose, **kwargs) elif function_type == 'array': func = ft.partial(process_block_source, function=function, as_memory=as_memory, as_array=True, verbose=verbose, **kwargs) else: raise ValueError("function type %r not 'array', 'source', 'block' or None!") if not isinstance(processes, int) and processes != "serial": processes = mp.cpu_count() if verbose: timer = tmr.Timer() print("Processing %d blocks with function %r." % (n_blocks, function.__name__)) if isinstance(processes, int): #from bounded_pool_executor import BoundedProcessPoolExecutor #with BoundedProcessPoolExecutor(max_workers=processes) as executor: #, source_blocks, sink_blocks) with CancelableProcessPoolExecutor(max_workers=processes) as executor: if workspace is not None: workspace.executor = executor futures = [executor.submit(func, *args) for args in zip(source_blocks, sink_blocks)] # res =, source_blocks, sink_blocks) result = [f.result() for f in futures] # To prevent keeping references to futures to avoid mem leaks # result = list(res) if workspace is not None: workspace.executor = None else: result = [func(*args) for args in zip(source_blocks, sink_blocks)] #analysis:ignore if verbose: timer.print_elapsed_time("Processed %d blocks with function %r" % (n_blocks, function.__name__)) #gc.collect(); if return_result: ret = result else: ret = sink if return_blocks: ret = (ret, [source_blocks, sink_blocks]) return ret
############################################################################### ### Helpers ###############################################################################
[docs] @ptb.parallel_traceback def process_block_source(sources, sinks, function, as_memory = False, as_array = False, verbose = False, **kwargs): """Process a block with full traceback. Arguments --------- sources : source specifications Sources passed to the function. sinks : sourcespecifications Sinks where data is written to. function func : function The function to call. """ if verbose: timer = tmr.Timer() print('Processing block %s' % (sources[0].info(),)) #sources = [s.as_real() for s in sources]; sources_input = sources if as_memory: sources = [s.as_memory() for s in sources] if as_array: sources = [s.array for s in sources] results = function(*sources, **kwargs) if not isinstance(results, (list, tuple)): results = [results] if len(sources_input) != len(sinks): sources_input = sources_input + [sources_input[0]] * (len(sinks) - len(sources)) for sink, source, result in zip(sinks, sources_input, results): #sink = sink.as_real(); sink.valid[:] = result[source.valid.slicing] if verbose: timer.print_elapsed_time('Processing block %s' % (sources_input[0].info(),)) gc.collect() return None
[docs] @ptb.parallel_traceback def process_block_block(sources, sinks, function, as_memory = False, return_result = False, verbose=False, **kwargs): """Process a block with full traceback. Arguments --------- sources : source specifications Sources passed to the function. sinks : sourcespecifications Sinks where data is written to. function func : function The function to call. """ if verbose: timer = tmr.Timer() print('Processing block %s' % (sources[0].info(),)) if as_memory: sinks_memory = [s.as_memory_block() for s in sinks] sources_and_sinks = [s.as_memory_block() for s in sources] + sinks_memory else: sources_and_sinks = sources + sinks result = function(*sources_and_sinks, **kwargs) if as_memory: for sink, sink_memory in zip(sinks, sinks_memory): sink.valid[:] = sink_memory.valid[:] if verbose: timer.print_elapsed_time('Processing block %s' % (sources[0].info(),)) gc.collect() if return_result: return result else: return None
############################################################################### ### Source splitting into blocks ###############################################################################
[docs] def block_sizes(size, processes = None, size_max = None, size_min = None, overlap = None, optimization = True, optimization_fix = 'all', verbose = False): """Calculates the block sizes along a single axis when splitting up a source . Arguments --------- size : int Size of the array dimension to be split up. processes : int Number of parallel processes to use. size_max : int or None. Maximal size of a block. If None, do not split. size_min : int, 'fixed', or None Minimal size of a block. If 'fixed' blocks will be of fixed size given by size_max and the overlap is increased if the last block is too small. If None, the minimal size is determined from the overlap. overlap : int or None Minimal overlap between blocks in a single axis. If None, the overlap defaults to zero. optimization : bool If True, optimize block sizes to best fit number of processes. optimization_fix : 'increase', 'decrease', 'all' or None Increase, decrease or optimally change the block size when optimization is active. verbose : bool Print information on block generation. Returns ------- n_blocks : int Number of blocks. block_ranges : list of tuple of ints Ranges of the blocks of the form [(lo0,hi0),(lo1,hi1),...]. valid_ranges : list of tuple of ints Valid ranges of the blocks of the form [(lo0,hi0),(lo1,hi1),...]. Note ---- The optimization allows block sizes to change slightly to better distribute the blocks over processes, assuming each block processes a similar amount of time. """ if processes is None: processes = mp.cpu_count() if not isinstance(processes, int): processes = 1 if processes <= 0: processes = 1 if size_max is None or size_max > size: size_max = size if overlap is None: overlap = 0 if overlap >= size: overlap = size - 1 fixed = False if size_min == 'fixed': size_min = size_max fixed = True optimization = False elif size_min is None: size_min = min(size_max, overlap + 1) if size_min > size: size_min = size #check consistency if size_min > size: raise RuntimeError('Minimal block size is larger than the data size %d > %d !' % (size_min, size)) if size_min > size_max: raise RuntimeError('Minimal block size larger than maximal block size %d > %d !' % (size_min, size_max)) if overlap >= size_max: raise ValueError('Overlap is larger than maximal block size: %d >= %d!' % (overlap, size_max)) if overlap >= size_min: raise ValueError('Overlap is larger than minimal block size: %d >= %d!' % (overlap, size_min)) #calcualte block size estimates block_size = size_max n_blocks = int(np.ceil(float(size - block_size) / (block_size - overlap) + 1)) if n_blocks <= 0: n_blocks = 1 if not fixed: block_size = float(size + (n_blocks-1) * overlap) / n_blocks if block_size < size_min: block_size = size_min if verbose: print("Estimated block size %d in %d blocks!" % (block_size, n_blocks)) if n_blocks == 1: return 1, [(0, size)], [(0, size)] #optimize number of blocks wrt to number of processors if optimization: n_add = n_blocks % processes if n_add != 0: if optimization_fix in [None, 'all', all]: if n_add < processes / 2.0: optimization_fix = 'increase' else: optimization_fix = 'decrease' if verbose: print("Optimizing block size to fit number of processes!") if optimization_fix == 'decrease': #try to deccrease block size / increase block number to fit distribution on processors n_blocks = n_blocks - n_add + processes block_size = float(size + (n_blocks-1) * overlap) / n_blocks if verbose: print("Optimized block size decreased to %d in %d blocks!" % (block_size, n_blocks)) elif optimization_fix == 'increase' and n_blocks > n_add: #try to increase chunk size and decrease chunk number to fit processors n_blocks = n_blocks - n_add block_size = float(size + (n_blocks-1) * overlap) / n_blocks if verbose: print("Optimized block size increased to %d in %d blocks!" % (block_size, n_blocks)) else: if verbose: print("Optimized block size %d unchanged in %d blocks!" % (block_size, n_blocks)) else: if verbose: print("Block size %d optimal in %d chunks!" % (block_size, n_blocks)) if block_size < size_min: #raise Warning("Warning: Some chunks with average chunk size %f.02 may be smaller than minima chunk size %d!" % (chunksize, sizeMin)); if verbose: print("Warning: Some blocks with average block size %.02f may be smaller than minimal block size %d due to optimization!" % (block_size, size_min)) if block_size > size_max: #raise Warning("Warning: optimized chunk size %f.02 is larger than maximum chunk size %d!" % (chunksize, sizeMax)); if verbose: print("Warning: Some blocks with average block size %.02f may be larger than maximum block size %d due to optimization!" % (block_size, size_max)) #calculate actual block sizes block_size_rest = block_size block_size = int(np.floor(block_size)) block_size_rest = block_size_rest - block_size block_ranges = [(0, block_size)] valid_ranges = [] valid_prev = 0 sr = block_size_rest hi = block_size n = 1 while n < n_blocks: n+=1 #range hi_prev = hi lo = hi - overlap hi = lo + block_size sr += block_size_rest if sr >= 1: sr -= 1 hi += 1 if n == n_blocks: hi = size if fixed: lo = hi - block_size block_ranges.append((lo, hi)) #borders valid = int(round((hi_prev - lo) / 2. + lo)) if valid > size: valid = size valid_ranges.append((valid_prev, valid)) valid_prev = valid valid_ranges.append((valid_prev, size)) if verbose: n_prt = min(10, n_blocks) if n_blocks > n_prt: pr = '...' else: pr = '' print("Final blocks : %d" % n_blocks) print("Final blocks : " + str(block_ranges[:n_prt]) + pr) print("Final borders: " + str(valid_ranges[:n_prt]) + pr) sizes = np.unique([r[1]- r[0] for r in block_ranges]) print("Final sizes : " + str(sizes)) return n_blocks, block_ranges, valid_ranges
[docs] def block_axes(source, axes=None): """ Determine the axes for block processing from source order. Arguments --------- source : array or Source The source on which the block processing is used. axes : list, 'all' or None The axes over which to split the block processing. .. deprecated:: 2.1 Value *all* (the built-in Python keyword) is now deprecated for parameter axes. You should replace it with *"all"* (the string literal) instead Returns ------- axes : list or None The axes over which to split the block processing. """ if axes == 'all' or axes is all: # FIXME: remove is all if axes is all: warnings.warn("Parameter axes could take all (the built-in Python keyword). " "This will be replaced by the string 'all' in future versions", DeprecationWarning, stacklevel=2) axes = [d for d in range(source.ndim)] if axes is not None: if np.max(axes) >= source.ndim or np.min(axes) < 0: raise ValueError(f'Axes specification {axes} for source with dimension {source.ndim} not valid!') return axes source = io.as_source(source) if source.order == 'F': axes = [source.ndim-1] else: axes = [0] return axes
[docs] def split_into_blocks(source, processes = None, axes = None, size_max = None, size_min = None, overlap = None, optimization = True, optimization_fix = 'all', neighbours = False, verbose = False, **kwargs): """splits a source into a list of Block sources for parallel processing. The block information is described in :mod:`ClearMapBlock` Arguments --------- source : Source Source to divide into blocks. processes : int Number of parallel processes to use. axes : int or list of ints or None Axes along which to split the source. If None, all axes are split. size_max : int or list of ints Maximal size of a block along the axes. size_min : int or list of ints Minial size of a block along the axes.. overlap : int or list of ints Minimal overlap between blocks along the axes. optimization : bool or list of bools If True, optimize block sizes to best fit number of processes. optimization_fix : 'increase', 'decrease', 'all' or None or list Increase, decrease or optimally change the block size when optimization is active. neighbours : bool If True, also include information about the neighbourhood in the blocks. verbose : bool Print information on block generation. Returns ------- blocks : list of Blocks List of Block classes dividing the source. """ shape = source.shape ndim = len(shape) axes = block_axes(source, axes=axes) n_axes = len(axes) size_max = _unpack(size_max, n_axes) size_min = _unpack(size_min, n_axes) overlap = _unpack(overlap, n_axes) optimization = _unpack(optimization, n_axes) optimization_fix = _unpack(optimization_fix, n_axes) #print size_max, size_min, overlap, optimization, optimization_fix #calculate block shapes blocks_shape = tuple() blocks_block_ranges = [] blocks_offsets = [] a = 0 for d in range(ndim): if d in axes: n_blocks, block_ranges, valid_ranges = \ block_sizes(shape[d], processes=processes, size_max=size_max[a], size_min=size_min[a], overlap=overlap[a], optimization=optimization[a], optimization_fix=optimization_fix[a], verbose=verbose) a += 1 else: n_blocks = 1 block_ranges = [(None, None)] valid_ranges = [(None, None)] #print(d, block_ranges, valid_ranges) offsets = [(v[0]-b[0], b[1]-v[1]) if b != (None, None) else (None, None) for b,v in zip(block_ranges, valid_ranges)] blocks_shape += (n_blocks,) blocks_block_ranges.append(block_ranges) blocks_offsets.append(offsets) #create blocks blocks_size = blocks = [] index_to_block = {} for i in range(blocks_size): index = np.unravel_index(i, blocks_shape) slicing = tuple(slice(b[0], b[1]) for b in [blocks_block_ranges[d][index[d]] for d in range(ndim)]) offsets = [(o[0], o[1]) for o in [blocks_offsets[d][index[d]] for d in range(ndim)]] block = blk.Block(source=source, slicing=slicing, offsets=offsets, index=index, blocks_shape=blocks_shape) blocks.append(block) if neighbours: index_to_block[index] = block if neighbours: for b in blocks: index = np.array(b.index) nbs = {} for d,i in enumerate(index): if i > 0: ii = index.copy(); ii[d] -= 1; ii = tuple(ii) nbs[ii] = index_to_block[ii] if i < blocks_shape[d] - 1: ii = index.copy(); ii[d] += 1; ii = tuple(ii) nbs[ii] = index_to_block[ii] b._neighbours = nbs return blocks
