Source code for ClearMap.ParallelProcessing.SharedMemoryManager
# -*- coding: utf-8 -*-
"""
SharedMemoryManager
===================
Shared memory array manager for parallel processing using ctype shared
arrays in :mod:`~ClearMap.ParallelProcessing.SharedMemoryArray`.
"""
__author__ = 'Christoph Kirst <christoph.kirst.ck@gmail.com>'
__license__ = 'GPLv3 - GNU General Pulic License v3 (see LICENSE)'
__copyright__ = 'Copyright © 2020 by Christoph Kirst'
__webpage__ = 'http://idisco.info'
__download__ = 'http://www.github.com/ChristophKirst/ClearMap2'
import multiprocessing as mp
import ClearMap.ParallelProcessing.SharedMemoryArray as sma
__all__ = ['get', 'insert', 'free', 'clean', 'zeros'];
###############################################################################
### Manager
###############################################################################
class SharedMemmoryManager(object):
"""SharedMemmoryManager provides handles to shared arrays for parallel processing."""
_instance = None
"""Pointer to global instance"""
__slots__ = ['arrays', 'current', 'count', 'lock'];
def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super(SharedMemmoryManager, cls).__new__(cls, *args, **kwargs)
return cls._instance
def __init__(self):
self.arrays = [None] * 32
self.current = 0
self.count = 0
self.lock = mp.Lock()
def handle(self):
# double size if necessary
if (self.count >= len(self.arrays)):
self.arrays = self.arrays + [None] * len(self.arrays);
# find free handle
while self.arrays[self.current] is not None:
self.current = (self.current + 1) % len(self.arrays)
return self.current;
@staticmethod
def instance():
if not SharedMemmoryManager._instance:
SharedMemmoryManager._instance = SharedMemmoryManager()
return SharedMemmoryManager._instance
@staticmethod
def zeros(shape, dtype = None, order = None):
self = SharedMemmoryManager.instance()
self.lock.acquire()
# next handle
self.handle()
# create array in shared memory segment and wrap with numpy
self.arrays[self.current] = sma.zeros(shape, dtype, order);
# update cnt
self.count += 1
self.lock.release()
return self.current;
@staticmethod
def insert(array):
self = SharedMemmoryManager.instance()
# next handle
self.handle()
# convert to shared array and insert into handle
self.arrays[self.current] = sma.as_shared(array);
# update cnt
self.count += 1
return self.current
@staticmethod
def free(hdl):
self = SharedMemmoryManager.instance()
self.lock.acquire()
# set reference to None
if self.arrays[hdl] is not None: # consider multiple calls to free
self.arrays[hdl] = None
self.count -= 1
self.lock.release()
@staticmethod
def clean():
self = SharedMemmoryManager.instance()
self.lock.acquire()
for i in range(len(self.arrays)):
self.arrays[i] = None;
self.current = 0
self.count = 0
self.lock.release()
@staticmethod
def get(i):
self = SharedMemmoryManager.instance()
return self.arrays[i]
###############################################################################
### Functionality
###############################################################################
[docs]
def zeros(shape, dtype = None, order = None):
"""Creates a shared zero array and inserts it into the shared memory manager.
Arguments
---------
shape : tuple
Shape of the array.
dtype : dtype or None
The type of the array.
order : 'C', 'F', or None
The contiguous order of the array.
Returns
-------
handle : int
The handle to this array.
"""
return SharedMemmoryManager.zeros(shape=shape, dtype=dtype, order=order)
[docs]
def get(handle):
"""Returns the array in the shared memory manager with given handle.
Arguments
---------
handle : int
Shared memory handle of the array.
Returns
-------
array : array
The shared array with the specified handle.
"""
return SharedMemmoryManager.get(handle)
[docs]
def insert(array):
"""Inserts the array in the shared memory manager.
Arguments
---------
array : array
The array to insert into the shared memory manager.
Returns
-------
handle : int
The shared array handle.
"""
return SharedMemmoryManager.insert(array)
[docs]
def free(handle):
"""Removes the array with given handle from the shared memory manager.
Arguments
---------
handle : int
Shared memory handle of the array.
"""
SharedMemmoryManager.free(handle)
[docs]
def clean():
"""Removes all references to the shared arrays."""
SharedMemmoryManager.clean()
###############################################################################
### Tests
###############################################################################
def _test():
#from importlib import reload
import ClearMap.ParallelProcessing.SharedMemoryManager as smm
reload(smm)
def propagate(t):
i, hdl = t
a = smm.get(hdl)
#if i % 100000 == 0:
# print('i=%d' % i)
for j in range(1):
a[i] = i
n = 5000000;
hdl = smm.zeros(n, dtype=float)
print(hdl)
pool = smm.mp.Pool(processes=2)
smm.mp.Process()
pp = pool.map_async(propagate, zip(range(n), [hdl] * n)); #analysis:ignore
pool.close()
pool.join();
result = smm.get(hdl)
print(result)
smm.sma.is_shared(result)
smm.free(hdl)
smm.clean()