Source code for ClearMap.IO.NRRD

# encoding: utf-8
"""
NRRD
====

IO interface to NRRD volumetric image data files.

Note
----
  The interface is based on nrrd.py for reading and writing nrrd files.
  See 'this link <http://teem.sourceforge.net/nrrd/format.html>`_ for 
  specifications.
"""
__author__    = 'Christoph Kirst <christoph.kirst.ck@gmail.com>'
__license__   = 'GPLv3 - GNU General Pulic License v3 (see LICENSE.txt)'
__copyright__ = 'Copyright © 2020 by Christoph Kirst'
__webpage__   = 'http://idisco.info'
__download__  = 'http://www.github.com/ChristophKirst/ClearMap2'

import os
import gzip
import bz2
import datetime 

import numpy as np

import ClearMap.IO.Source as src


###############################################################################
### Source class
###############################################################################

[docs] class Source(src.Source): """Nrrd array source.""" def __init__(self, location): """Nrrd source class constructor. Arguments --------- location : str The file nameof the nrrd source. """ self._location = location @property def name(self): return "Nrrd-Source" @property def location(self): return self._location @location.setter def location(self, value): if value != self.location: self._location = value @property def array(self): """The underlying data array. Returns ------- array : array The underlying data array of this source. """ return _array(self.location) @array.setter def array(self, value): _write_data(self.location, value) @property def shape(self): """The shape of the source. Returns ------- shape : tuple The shape of the source. """ return _shape(self.location) @shape.setter def shape(self, value): #TODO: fix raise NotImplementedError('Cannot set shape of nrrd file') @property def dtype(self): """The data type of the source. Returns ------- dtype : dtype The data type of the source. """ return _dtype(self.location) @dtype.setter def dtype(self, value): #TODO: fix raise NotImplementedError('Cannot set dtype of nrrd file') @property def order(self): """The order of how the data is stored in the source. Returns ------- order : str Returns 'C' for C contigous and 'F' for fortran contigous, None otherwise. """ return _order(self.location) @order.setter def order(self, value): #TODO: fix raise NotImplementedError('Cannot set order of nrrd file') @property def element_strides(self): """The strides of the array elements. Returns ------- strides : tuple Strides of the array elements. Note ---- The strides of the elements module itemsize instead of bytes. """ memmap = _memmap(self.location) return tuple(s // memmap.itemsize for s in memmap.strides) @property def offset(self): """The offset of the memory map in the file. Returns ------- offset : int Offset of the memeory map in the file. """ return _offset(self.location) ### Data def __getitem__(self, *args): memmap = _memmap(self.location) return memmap.__getitem__(*args) def __setitem__(self, *args): memmap = _memmap(self.location) memmap.__setitem__(*args)
[docs] def metadata(self, info = None): """Returns metadata from this nrrd file. Arguments --------- info : list or all Optional list of keywords, if all return full tif metadata, if None return default set info. Returns ------- metadata : dict Dictionary with the meta data. """ return _read_header(self.location)
[docs] def as_memmap(self): return _memmap(self.location)
[docs] def as_virtual(self): return VirtualSource(source = self)
[docs] def as_real(self): return self
[docs] def as_buffer(self): return self.as_memmap()
### Formatting def __str__(self): try: name = self.name name = '%s' % name if name is not None else '' except: name ='' try: shape = self.shape shape ='%r' % ((shape,)) if shape is not None else '' except: shape = '' try: dtype = self.dtype dtype = '[%s]' % dtype if dtype is not None else '' except: dtype = '' try: order = self.order order = '|%s|' % order if order is not None else '' except: order = '' # try: # memory = self.memory; # memory = '<%s>' % memory if memory is not None else ''; # except: # memory = ''; try: location = self.location location = '%s' % location if location is not None else '' if len(location) > 100: location = location[:50] + '...' + location[-50:] if len(location) > 0: location = '{%s}' % location except: location = '' return name + shape + dtype + order + location
[docs] class VirtualSource(src.VirtualSource): def __init__(self, source = None, shape = None, dtype = None, order = None, location = None, name = None): super(VirtualSource, self).__init__(source=source, shape=shape, dtype=dtype, order=order, location=location, name=name) if isinstance(source, Source): self.location = source.location @property def name(self): return 'Virtual-Nrrd-Source'
[docs] def as_virtual(self): return self
[docs] def as_real(self): return Source(location=self.location)
[docs] def as_buffer(self): return self.as_real().as_buffer()
############################################################################### ### IO Interface ###############################################################################
[docs] def is_nrrd(source): """Checks if this source is a NRRD source""" if isinstance(source, Source): return True if isinstance(source, str) and len(source) >= 4 and source[-4:] == 'nrrd': try: Source(source) except: return False return True return False
[docs] def read(source, slicing = None, **kwargs): """Read data from a nrrd file. Arguments --------- source : str The name of the nrrd file. slicing : slice, Slice or None An optional sub-slice to consider. Returns ------- array : array The image data in the tif file as a buffer. """ if not isinstance(source, Source): source = Source(source) if slicing is None: return source.array else: return source.__getitem__(slicing)
[docs] def write(sink, data, slicing = None, **kwargs): if isinstance(sink, Source): sink = sink.location if not isinstance(sink, str): raise ValueError('Invalid sink specification %r' % sink) if slicing is not None: memmap = _memmap(sink, mode='r+') memmap[slicing]= data return sink else: return _write(sink, data)
[docs] def create(location = None, shape = None, dtype = None, order = None, mode = None, array = None, as_source = True, **kwargs): raise NotImplementedError('Creating NRRD files not implemented yet!')
############################################################################### ### Reading ###############################################################################
[docs] class NrrdError(Exception): """Exceptions for Nrrd class.""" pass
def _convert_to_reproducible_floatingpoint( x ): #This will help prevent loss of precision #IEEE754-1985 standard says that 17 decimal digits is enough in all cases. if type(x) == float: value = '{:.16f}'.format(x).rstrip('0').rstrip('.') # Remove trailing zeros, and dot if at end else: value = str(x) return value _TYPEMAP_NRRD2NUMPY = { 'signed char': 'i1', 'int8': 'i1', 'int8_t': 'i1', 'uchar': 'u1', 'unsigned char': 'u1', 'uint8': 'u1', 'uint8_t': 'u1', 'short': 'i2', 'short int': 'i2', 'signed short': 'i2', 'signed short int': 'i2', 'int16': 'i2', 'int16_t': 'i2', 'ushort': 'u2', 'unsigned short': 'u2', 'unsigned short int': 'u2', 'uint16': 'u2', 'uint16_t': 'u2', 'int': 'i4', 'signed int': 'i4', 'int32': 'i4', 'int32_t': 'i4', 'uint': 'u4', 'unsigned int': 'u4', 'uint32': 'u4', 'uint32_t': 'u4', 'longlong': 'i8', 'long long': 'i8', 'long long int': 'i8', 'signed long long': 'i8', 'signed long long int': 'i8', 'int64': 'i8', 'int64_t': 'i8', 'ulonglong': 'u8', 'unsigned long long': 'u8', 'unsigned long long int': 'u8', 'uint64': 'u8', 'uint64_t': 'u8', 'float': 'f4', 'double': 'f8', 'block': 'V' } _TYPEMAP_NUMPY2NRRD = { 'i1': 'int8', 'u1': 'uint8', 'i2': 'int16', 'u2': 'uint16', 'i4': 'int32', 'u4': 'uint32', 'i8': 'int64', 'u8': 'uint64', 'f4': 'float', 'f8': 'double', 'V': 'block' } _NUMPY2NRRD_ENDIAN_MAP = { '<': 'little', 'L': 'little', '>': 'big', 'B': 'big' } def _parse_nrrdvector(inp): """Parse a vector from a nrrd header, return a list.""" assert inp[0] == '(', "Vector should be enclosed by parenthesis." assert inp[-1] == ')', "Vector should be enclosed by parenthesis." return [_convert_to_reproducible_floatingpoint(x) for x in inp[1:-1].split(',')] def _parse_optional_nrrdvector(inp): """Parse a vector from a nrrd header that can also be none.""" if (inp == "none"): return inp else: return _parse_nrrdvector(inp) _NRRD_FIELD_PARSERS = { 'dimension': int, 'type': str, 'sizes': lambda fieldValue: [int(x) for x in fieldValue.split()], 'endian': str, 'encoding': str, 'min': float, 'max': float, 'oldmin': float, 'old min': float, 'oldmax': float, 'old max': float, 'lineskip': int, 'line skip': int, 'byteskip': int, 'byte skip': int, 'content': str, 'sample units': str, 'datafile': str, 'data file': str, 'spacings': lambda fieldValue: [_convert_to_reproducible_floatingpoint(x) for x in fieldValue.split()], 'thicknesses': lambda fieldValue: [_convert_to_reproducible_floatingpoint(x) for x in fieldValue.split()], 'axis mins': lambda fieldValue: [_convert_to_reproducible_floatingpoint(x) for x in fieldValue.split()], 'axismins': lambda fieldValue: [_convert_to_reproducible_floatingpoint(x) for x in fieldValue.split()], 'axis maxs': lambda fieldValue: [_convert_to_reproducible_floatingpoint(x) for x in fieldValue.split()], 'axismaxs': lambda fieldValue: [_convert_to_reproducible_floatingpoint(x) for x in fieldValue.split()], 'centerings': lambda fieldValue: [str(x) for x in fieldValue.split()], 'labels': lambda fieldValue: [str(x) for x in fieldValue.split()], 'units': lambda fieldValue: [str(x) for x in fieldValue.split()], 'kinds': lambda fieldValue: [str(x) for x in fieldValue.split()], 'space': str, 'space dimension': int, 'space units': lambda fieldValue: [str(x) for x in fieldValue.split()], 'space origin': _parse_nrrdvector, 'space directions': lambda fieldValue: [_parse_optional_nrrdvector(x) for x in fieldValue.split()], 'measurement frame': lambda fieldValue: [_parse_nrrdvector(x) for x in fieldValue.split()], } _NRRD_REQUIRED_FIELDS = ['dimension', 'type', 'encoding', 'sizes'] # The supported field values _NRRD_FIELD_ORDER = [ 'type', 'dimension', 'space dimension', 'space', 'sizes', 'space directions', 'kinds', 'endian', 'encoding', 'min', 'max', 'oldmin', 'old min', 'oldmax', 'old max', 'content', 'sample units', 'spacings', 'thicknesses', 'axis mins', 'axismins', 'axis maxs', 'axismaxs', 'centerings', 'labels', 'units', 'space units', 'space origin', 'measurement frame', 'data file'] def _dtype_from_header(fields): """Determine the numpy dtype of the data.""" # Process the data type np_typestring = _TYPEMAP_NRRD2NUMPY[fields['type']] if np.dtype(np_typestring).itemsize > 1: if 'endian' not in fields: raise NrrdError('Nrrd header misses required field: "endian".') if fields['endian'] == 'big': np_typestring = '>' + np_typestring elif fields['endian'] == 'little': np_typestring = '<' + np_typestring return np.dtype(np_typestring) def _validate_magic_line(line): """For NRRD files, the first four characters are always "NRRD", and remaining characters give information about the file format version """ if not line.startswith('NRRD'): raise NrrdError('Missing magic "NRRD" word. Is this an NRRD file?') try: if int(line[4:]) > 5: raise NrrdError('NRRD file version too new for this library.') except: raise NrrdError('Invalid NRRD magic line: %s' % (line,)) return len(line) def _read_header(filename): """Parse the fields in the nrrd header nrrdfile can be any object which supports the iterator protocol and returns a string each time its next() method is called — file objects and list objects are both suitable. If csvfile is a file object, it must be opened with the ‘b’ flag on platforms where that makes a difference (e.g. Windows) >>> _read_header(("NRRD0005", "type: float", "dimension: 3")) {'type': 'float', 'dimension': 3, 'keyvaluepairs': {}} >>> _read_header(("NRRD0005", "my extra info:=my : colon-separated : values")) {'keyvaluepairs': {'my extra info': 'my : colon-separated : values'}} """ if isinstance(filename, str): nrrdfile = open(filename,'rb') else: nrrdfile = filename # Collect number of bytes in the file header (for seeking below) headerSize = 0 it = iter(nrrdfile) headerSize += _validate_magic_line(next(it).decode('ascii')) header = { 'keyvaluepairs': {} } for raw_line in it: headerSize += len(raw_line) raw_line = raw_line.decode('ascii') # Trailing whitespace ignored per the NRRD spec line = raw_line.rstrip() # Comments start with '#', no leading whitespace allowed if line.startswith('#'): continue # Single blank line separates the header from the data if line == '': break # Handle the <key>:=<value> lines first since <value> may contain a # ': ' which messes up the <field>: <desc> parsing key_value = line.split(':=', 1) if len(key_value) == 2: key, value = key_value # escape \\ and \n ?? # value.replace(r'\\\\', r'\\').replace(r'\n', '\n') header['keyvaluepairs'][key] = value continue # Handle the "<field>: <desc>" lines. field_desc = line.split(': ', 1) if len(field_desc) == 2: field, desc = field_desc ## preceeding and suffixing white space should be ignored. field = field.rstrip().lstrip() desc = desc.rstrip().lstrip() if field not in _NRRD_FIELD_PARSERS: raise NrrdError('Unexpected field in nrrd header: "%s".' % field) if field in header.keys(): raise NrrdError('Duplicate header field: "%s"' % field) header[field] = _NRRD_FIELD_PARSERS[field](desc) continue # Should not reach here raise NrrdError('Invalid header line: "%s"' % line) # Check whether the required fields are there for field in _NRRD_REQUIRED_FIELDS: if field not in header: raise NrrdError('Nrrd header misses required field: "%s".' % (field)) # line reading was buffered; correct file pointer to just behind header: nrrdfile.seek(headerSize) return header def _array(filename): """Read the actual data into a numpy array.""" with open(filename,'rb') as filehandle: fields = _read_header(filehandle) dtype = _dtype_from_header(fields) shape = fields['sizes'] order = 'F' #offset numPixels=np.prod(shape) datafilehandle = filehandle datafile = fields.get("datafile", fields.get("data file", None)) if datafile is not None: if os.path.isabs(datafile): datafilename = datafile else: datafilename = os.path.join(os.path.dirname(filename), datafile) datafilehandle = open(datafilename,'rb') if fields['encoding'] == 'raw': byteskip = fields.get('byteskip', fields.get('byte skip', 0)) if byteskip == -1: # This is valid only with raw encoding totalbytes = dtype.itemsize * numPixels datafilehandle.seek(-totalbytes, 2) else: lineskip = fields.get('lineskip', fields.get('line skip', 0)) for _ in range(lineskip): datafilehandle.readline() datafilehandle.read(byteskip) data = np.fromfile(datafilehandle, dtype) elif fields['encoding'] == 'gzip' or\ fields['encoding'] == 'gz': gzipfile = gzip.GzipFile(fileobj=datafilehandle) # Again, unfortunately, np.fromfile does not support # reading from a gzip stream, so we'll do it like this. data = np.fromstring(gzipfile.read(), dtype) elif fields['encoding'] == 'bzip2' or\ fields['encoding'] == 'bz2': bz2file = bz2.BZ2File(fileobj=datafilehandle) # Again, unfortunately, np.fromfile does not support # reading from a gzip stream, so we'll do it like this. data = np.fromstring(bz2file.read(), dtype) else: raise NrrdError('Unsupported encoding: "%s"' % fields['encoding']) if numPixels != data.size: raise NrrdError(f'ERROR: {numPixels}-{data.size}={numPixels - data.size}') data = np.reshape(data, shape, order=order) return data def _dtype(filename): """Determine data type from nrrd file.""" # read header with open(filename,'rb') as filehandle: fields = _read_header(filehandle) # Determine the data type from the fields dtype = fields['sizes'](fields) return dtype def _shape(filename): """Determine shape from nrrd file.""" #read header with open(filename,'rb') as filehandle: fields = _read_header(filehandle) return tuple(fields['sizes']) def _order(filename): """Determine shape from nrrd file.""" return 'F' def _offset(filename): """Offset of data in file.""" with open(filename,'rb') as filehandle: fields = _read_header(filehandle) #offset datafilehandle = filehandle datafile = fields.get("datafile", fields.get("data file", None)) if datafile is not None: if os.path.isabs(datafile): datafilename = datafile else: datafilename = os.path.join(os.path.dirname(filename), datafile) datafilehandle = open(datafilename,'rb') if fields['encoding'] == 'raw': byteskip = fields.get('byteskip', fields.get('byte skip', 0)) if byteskip == -1: # This is valid only with raw encoding numPixels=np.prod(fields['sizes']) dtype = _dtype_from_header(fields) totalbytes = dtype.itemsize * numPixels datafilehandle.seek(-totalbytes, 2) else: lineskip = fields.get('lineskip', fields.get('line skip', 0)) for _ in range(lineskip): datafilehandle.readline() datafilehandle.read(byteskip) return datafilehandle.tell() def _memmap(filename, mode = None): """Create memmap to the nrrd data.""" with open(filename,'rb') as filehandle: fields = _read_header(filehandle) if fields['encoding'] != 'raw': raise NrrdError('Cannot memmap to compressed file %r!' % fields['encoding']) dtype = _dtype_from_header(fields) shape = tuple(fields['sizes']) order = 'F' #datafile datafilename = filename datafilehandle = filehandle datafile = fields.get("datafile", fields.get("data file", None)) if datafile is not None: if os.path.isabs(datafile): datafilename = datafile else: datafilename = os.path.join(os.path.dirname(filename), datafile) datafilehandle = open(datafilename,'rb') #offset byteskip = fields.get('byteskip', fields.get('byte skip', 0)) if byteskip == -1: # This is valid only with raw encoding numPixels=np.prod(shape) totalbytes = dtype.itemsize * numPixels datafilehandle.seek(-totalbytes, 2) else: lineskip = fields.get('lineskip', fields.get('line skip', 0)) for _ in range(lineskip): datafilehandle.readline() datafilehandle.read(byteskip) offset = datafilehandle.tell() if mode is None: mode = 'r+' #print datafilename, dtype, mode, shape, order, offset return np.memmap(datafilename, dtype=dtype, mode=mode, offset=offset, shape=shape, order=order) ############################################################################### ### Writing ############################################################################### def _format_nrrd_list(fieldValue) : return ' '.join([_convert_to_reproducible_floatingpoint(x) for x in fieldValue]) def _format_nrrdvector(v) : return '(' + ','.join([_convert_to_reproducible_floatingpoint(x) for x in v]) + ')' def _format_optional_nrrdvector(v): if (v == 'none') : return 'none' else : return _format_nrrdvector(v) _NRRD_FIELD_FORMATTERS = { 'dimension': str, 'type': str, 'sizes': _format_nrrd_list, 'endian': str, 'encoding': str, 'min': str, 'max': str, 'oldmin': str, 'old min': str, 'oldmax': str, 'old max': str, 'lineskip': str, 'line skip': str, 'byteskip': str, 'byte skip': str, 'content': str, 'sample units': str, 'datafile': str, 'data file': str, 'spacings': _format_nrrd_list, 'thicknesses': _format_nrrd_list, 'axis mins': _format_nrrd_list, 'axismins': _format_nrrd_list, 'axis maxs': _format_nrrd_list, 'axismaxs': _format_nrrd_list, 'centerings': _format_nrrd_list, 'labels': _format_nrrd_list, 'units': _format_nrrd_list, 'kinds': _format_nrrd_list, 'space': str, 'space dimension': str, 'space units': _format_nrrd_list, 'space origin': _format_nrrdvector, 'space directions': lambda fieldValue: ' '.join([_format_optional_nrrdvector(x) for x in fieldValue]), 'measurement frame': lambda fieldValue: ' '.join([_format_optional_nrrdvector(x) for x in fieldValue]), } def _write_data(data, filehandle, options): # Now write data directly # rawdata = data.transpose([2,0,1]).tostring(order = 'C') # FIXME: this is only working for numpy arrays! compressors = { 'gzip': gzip, 'bz2': bz2 } encoding = options['encoding'] if encoding == 'raw': data.tofile(filehandle) elif encoding in compressors.keys(): with compressors[encoding].open(filename=filehandle, mode='wb') as compressor_file: compressor_file.write(data.tostring(order='F')) else: raise NrrdError(f'Unsupported encoding: {encoding}') def _write(filename, data, options={}, separate_header=False): """Write data to nrrd file. Arguments: filename (str): file name as regular expression data (array): image data options (dict): options dictionary separateHeader (bool): write a separate header file Returns: str: nrrd output file name To sample date use `options['spacings'] = [s1, s2, s3]` for 3d data with sampling deltas `s1`, `s2`, and `s3` in each dimension. """ dtype = data.dtype options['type'] = _TYPEMAP_NUMPY2NRRD[dtype.str[1:]] if dtype.itemsize > 1: options['endian'] = _NUMPY2NRRD_ENDIAN_MAP[dtype.str[:1]] # if 'space' is specified 'space dimension' can not. See http://teem.sourceforge.net/nrrd/format.html#space if 'space' in options.keys() and 'space dimension' in options.keys(): del options['space dimension'] options['dimension'] = data.ndim dsize = list(data.shape) options['sizes'] = dsize # The default encoding is 'raw' if 'encoding' not in options: options['encoding'] = 'raw' # A bit of magic in handling options here. # If *.nhdr filename provided, this overrides `separate_header=False` # If *.nrrd filename provided AND separate_header=True, separate files written. # For all other cases, header & data written to same file. if filename[-5:] == '.nhdr': separate_header = True if 'data file' not in options: datafilename = filename[:-4] + str('raw') if options['encoding'] == 'gzip': datafilename += '.gz' options['data file'] = datafilename else: datafilename = options['data file'] elif filename[-5:] == '.nrrd' and separate_header: separate_header = True datafilename = filename filename = filename[:-4] + str('nhdr') else: # Write header & data as one file datafilename = filename separate_header = False with open(filename,'wb') as filehandle: filehandle.write(b'NRRD0005\n') filehandle.write(b'# This NRRD file was generated by pynrrd\n') filehandle.write(b'# on ' + datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S').encode('ascii') + b'(GMT).\n') filehandle.write(b'# Complete NRRD file format specification at:\n') filehandle.write(b'# http://teem.sourceforge.net/nrrd/format.html\n') # Write the fields in order, this ignores fields not in _NRRD_FIELD_ORDER for field in _NRRD_FIELD_ORDER: if field in options: outline = (field + ': ' + _NRRD_FIELD_FORMATTERS[field](options[field]) + '\n').encode('ascii') filehandle.write(outline) d = options.get('keyvaluepairs', {}) for (k,v) in sorted(d.items(), key=lambda t: t[0]): outline = (str(k) + ':=' + str(v) + '\n').encode('ascii') filehandle.write(outline) # Write the closing extra newline filehandle.write(b'\n') # If a single file desired, write data if not separate_header: _write_data(data, filehandle, options) # If separate header desired, write data to different file if separate_header: with open(datafilename, 'wb') as datafilehandle: _write(data, datafilehandle, options) return filename ############################################################################### ### Tests ############################################################################### def _test(): import os import numpy as np import ClearMap.IO.NRRD as NRRD data = np.random.rand(20,50,10) data[5:15, 20:45, 2:9] = 0 filename = 'test.nrrd' NRRD.write(filename, data) check = NRRD.read(filename) print(np.all(data==check)) os.remove(filename)