# -*- coding: utf-8 -*-
"""
SharedMemoryArray
=================
Shared ctype memory arrays.
"""
__author__ = 'Christoph Kirst <christoph.kirst.ck@gmail.com>'
__license__ = 'GPLv3 - GNU General Pulic License v3 (see LICENSE)'
__copyright__ = 'Copyright © 2020 by Christoph Kirst'
__webpage__ = 'http://idisco.info'
__download__ = 'http://www.github.com/ChristophKirst/ClearMap2'
import numpy as np
import multiprocessing as mp
__all__ = ['ctype', 'base', 'empty', 'zeros', 'zeros_like', 'ones']
###############################################################################
### Functionality
###############################################################################
[docs]
def ctype(dtype):
"""Determine ctype from array or dtype for ctype array construction
Arguments
---------
dtype : array or dtype
The array or data type to determine the c type from.
Returns
-------
ctype : str
The c-type correspinding to the array or dtype.
"""
#get dtype in case argument is a ndarray
if isinstance(dtype, np.ndarray):
dtype = dtype.dtype;
#convert to typestr
a = np.empty(1, dtype = dtype);
if a.dtype == bool:
a = a.astype('uint8');
#typestr = np.ctypeslib.as_ctypes(a[0]).__array_interface__['typestr'];
typestr = a.__array_interface__['typestr'];
#map to ctype
try:
#numpy 1.14
ct = np.ctypeslib._typecodes[typestr];
except:
#numpy 1.16
ct = np.ctypeslib._ctype_from_dtype_scalar(np.dtype(typestr));
return ct;
[docs]
def base(array):
"""Return the underlying multiprocessing shared raw array from a shared numpy array
Arguments
---------
array : array
Shared array.
Returns
-------
array : array
The raw shared memory base array.
"""
try:
return array.base.base;
except:
raise RuntimeError('Array has no shared base');
def array(shape, dtype = None, order = None):
"""Create a shared array wrapped in numpy array."""
if dtype is None:
dtype = float;
if order is None:
order = 'A';
#create shared memory
shared = mp.RawArray(ctype(dtype), int(np.prod(shape)));
#wrap with numpy array and reshape
array = np.frombuffer(shared, dtype=dtype);
array = array.reshape(shape, order=order);
return array;
[docs]
def empty(shape, dtype = None, order = None):
"""Creates a empty shared memory array with numpy wrapper
Arguments
---------
shape : tuple of ints
The shape of the shared memory array to create.
dtype : array or dtype
The array or data type to determine the c type from, if None float is used.
order : C', 'F', or None
The order of the array.
Returns
-------
array : array
A shared memory array wrapped as ndarray.
"""
return array(shape=shape, dtype=dtype, order=order);
[docs]
def zeros(shape, dtype = None, order = None):
"""Creates a shared memory array of zeros with numpy wrapper
Arguments
---------
shape : tuple of ints
The shape of the shared memory array to create.
dtype : array or dtype
The array or data type to determine the c type from, if None float is used.
order : 'A', 'C', 'F', or None
The order of the array. If None, 'A' is used.
Returns
-------
array : array
A shared memory array wrapped as ndarray.
"""
return array(shape, dtype=dtype, order=order);
[docs]
def zeros_like(source, shape = None, dtype = None, order = None):
"""Creates a shared memory array with numpy wrapper using shape, dtype and order from source
Arguments
---------
source : array
The source array to use as template.
shape : tuple of ints
The shape of the shared memory array to create.
dtype : array or dtype
The array or data type to determine the c type from, if None float is used.
order : 'A', 'C', 'F', or None
The order of the array. If None, 'A' is used.
Returns
-------
array : array
A shared memory array wrapped as ndarray basedon the source array.
"""
if dtype is None:
dtype = source.dtype;
if shape is None:
shape = source.shape;
if order is None:
if np.isfortran(source):
order = 'F';
else:
order = 'C';
return array(shape, dtype=dtype, order=order);
[docs]
def ones(shape, dtype = None, order = None):
"""Creates a shared memory array of ones with numpy wrapper
Arguments
---------
shape : tuple of ints
The shape of the shared memory array to create.
dtype : array or dtype
The array or data type to determine the c type from, if None float is used.
order : 'A', 'C', 'F', or None
The order of the array. If None, 'A' is used.
Returns
-------
array : array
A shared memory array wrapped as ndarray.
"""
a = array(shape, dtype=dtype, order=order);
a[:] = 1;
return a;
def is_shared(array):
"""Returns True if array is a shared memory array
Arguments
---------
array : array
The array to check if it is shared.
Returns
-------
is_shared : bool
True if the array is a shared memory array.
"""
if not isinstance(array, np.ndarray):
return False;
try:
base = array.base
if base is None:
return False
elif type(base).__module__.startswith('multiprocessing.sharedctypes'):
return True
else:
return is_shared(base)
except:
return False
def as_shared(source, copy=False, order=None):
"""Convert array to a shared memory array
Arguments
---------
source : array
The source array to use as template.
copy : bool
If True, the data in source is copied.
order : C', 'F', or None
The order to use for an array if copied or not a shared array. If None, the order of the source is used.
Returns
-------
array : array
A shared memory array wrapped as ndarray based on the source array.
"""
# already a shared array ?
if not copy and is_shared(source):
return source
if order is None:
order = 'A';
a = array(shape=source.shape, dtype=source.dtype, order=order)
a[:] = source
return a;
###############################################################################
### Tests
###############################################################################
def _test():
#from importlib import reload
import numpy as np
import ClearMap.ParallelProcessing.SharedMemoryArray as sma
#reload(sma)
n = 10;
array = sma.zeros(n)
non_shared = np.zeros(n)
def propagate(arg):
i, a = arg
for j in range(1000):
array[i] = i
def propagate_non_shared(arg):
i, a = arg
for j in range(1000):
non_shared[i] = i
sma.is_shared(non_shared)
sma.is_shared(array)
pool = sma.mp.Pool(processes=4)
pp = pool.map(propagate, zip(range(n), [None] * n));
print(array)
pool = sma.mp.Pool(processes=4)
pp = pool.map(propagate_non_shared, zip(range(n), [None] * n)); #analysis:ignore
print(non_shared)