Source code for ClearMap.Analysis.vasculature.flow.linear_system

from __future__ import division, print_function
import pickle
from itertools import chain

import numpy as np

from pyamg import rootnode_solver

from scipy.sparse import lil_matrix, linalg
from scipy.sparse.linalg import gmres

from pyximport import pyximport


import ClearMap.Analysis.vasculature.flow.units as units
pyximport.install(setup_args={"include_dirs": [np.get_include()]}, reload_support=True)
from ClearMap.Analysis.vasculature.flow.physiology import Physiology  # dot import for pyx

__all__ = ['LinearSystem']
defaultUnits = {'length': 'um', 'mass': 'ug', 'time': 'ms'}


# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------


[docs] class LinearSystem(object): def __init__(self, G, withRBC=0, invivo=0, dMin_empirical=3.5, htdMax_empirical=0.6, verbose=True, **kwargs): """ Computes the flow and pressure field of a vascular graph without RBC tracking. It can be chosen between pure plasma flow, constant hematocrit or a given htt/htd distribution. The pressure boundary conditions (pBC) should be given in mmHg and pressure will be output in mmHg Parameters ---------- G: igraph.Graph Vascular graph in iGraph format. withRBC: int 0: no RBCs, pure plasma Flow (default) 0 < withRBC < 1 & 'htt' not in edgeAttributes: the given value is assigned as htt to all edges. 0 < withRBC < 1 & 'htt' in edgeAttributes: the given value is assigned as htt to all edges where htt = None. invivo: boolean whether the invivo or invitro empirical functions are used (default = 0, i.e. in vitro) dMin_empiricial: float lower limit for the diameter that is used to compute nurel (effective viscosity). The aim of the limit is to avoid using the empirical equations in a range where no experimental data is available (default = 3.5). htdMax_empirical: float upper limit for htd that is used to compute nurel (effective viscosity). The aim of the limit is to avoid using the empirical equations in a range where no experimental data is available (default = 0.6). Maximum has to be 1. verbose: bool if WARNINGS and setup information is printed Returns ------- None, the edge properties htt is assigned and the function update is executed (see description for more details) """ self._G = G self._eps = np.finfo(float).eps print(self._eps) self._eps = 1e-5 self._P = Physiology(defaultUnits) self._muPlasma = self._P.dynamic_plasma_viscosity() self._withRBC = withRBC self._invivo = invivo self._verbose = verbose self._dMin_empirical = dMin_empirical self._htdMax_empirical = htdMax_empirical if self._verbose: print('INFO: The limits for the compuation of the effective viscosity are set to') print(f'Minimum diameter {self._dMin_empirical:.2f}') print(f'Maximum discharge {self._htdMax_empirical:.2f}') if self._withRBC != 0: if self._withRBC < 1.: if 'htt' not in G.es.attribute_names(): G.es['htt'] = [self._withRBC] * G.ecount() # G.ecount() --> total number of edges else: httNone = G.es(htt_eq=None).indices if len(httNone) > 0: G.es[httNone]['htt'] = [self._withRBC] * len(httNone) else: if self._verbose: print('WARNING: htt is already an edge attribute. \n Existing values are not overwritten!' + \ '\n If new values should be assigned htt has to be deleted beforehand!') else: print('ERROR: 0 < withRBC < 1') if 'rBC' not in G.vs.attribute_names(): # instead of pressure boundary conditions, in/outflow boundary conditions can be assigned (vertex attributed rBC) G.vs['rBC'] = [None] * G.vcount() if 'pBC' not in G.vs.attribute_names(): G.vs['pBC'] = [None] * G.vcount() self.update() # --------------------------------------------------------------------------
[docs] def update(self): """ Constructs the linear system A x = b where the matrix A contains the conductance information of the vascular graph, the vector b specifies the boundary conditions and the vector x holds the pressures at the vertices (for which the system needs to be solved). Returns ------- matrix A and vector b """ htt2htd = self._P.tube_to_discharge_hematocrit nurel = self._P.relative_apparent_blood_viscosity G = self._G # Convert 'pBC' ['mmHG'] to default Units # for v in G.vs(pBC_ne=None): # v['pBC']=v['pBC']*vgm.units.scaling_factor_du('mmHg',G['defaultUnits']) nVertices = G.vcount() b = np.zeros(nVertices) A = lil_matrix((nVertices, nVertices), dtype=float) # Compute nominal and specific resistance: self._update_nominal_and_specific_resistance() # if with RBCs compute effective resistance if self._withRBC: # using numpy notation: # dischargeHt = np.minimum(htt2htd(G.es['htt'], G.es['diameter'], self._invivo), 1.0) dischargeHt = [min(htt2htd(htt, d, self._invivo), 1.0) for htt, d in zip(G.es['htt'], G.es['diameter'])] G.es['htd'] = dischargeHt G.es['effResistance'] = [ res * nurel(max(self._dMin_empirical, d), min(dHt, self._htdMax_empirical), self._invivo) \ for res, dHt, d in zip(G.es['resistance'], dischargeHt, G.es['diameter'])] G.es['conductance'] = 1 / np.array(G.es['effResistance']) else: G.es['conductance'] = [1 / e['resistance'] for e in G.es] self._conductance = G.es['conductance'] for vertex in G.vs: i = vertex.index A.data[i] = [] A.rows[i] = [] b[i] = 0.0 if vertex['pBC'] is not None: A[i, i] = 1.0 b[i] = vertex['pBC'] else: aDummy = 0 k = 0 neighbors = [] for edge in G.incident(i, 'all'): # G.incident --> list of edges attached to vertex i if G.is_loop(edge): continue j = G.neighbors(i)[k] # G.neighbors --> list of vertices neighboring vertex i k += 1 conductance = G.es[edge]['conductance'] neighbor = G.vs[j] # +=, -= account for multiedges aDummy += conductance if neighbor['pBC'] is not None: b[i] = b[i] + neighbor['pBC'] * conductance else: if j not in neighbors: A[i, j] = - conductance else: A[i, j] = A[i, j] - conductance neighbors.append(j) if vertex['rBC'] is not None: b[i] += vertex['rBC'] A[i, i] = aDummy self._A = A self._b = b
# --------------------------------------------------------------------------
[docs] def solve(self, method, dest_file_path='sampledict.pkl', **kwargs): """ Solves the linear system A x = b for the vector of unknown pressures x, either using a direct solver (obsolete) or an iterative GMRES solver. From the pressures, the flow field is computed. Parameters ---------- method: str This can be either 'direct' or 'iterative2' dest_file_path: str The path to save the output file Returns ------- None - G is modified in place. G_final.pkl & G_final.vtp: are save as output sampledict.pkl: is saved as output """ # htt2htd = self._P.tube_to_discharge_hematocrit A = self._A.tocsr() if method == 'direct': linalg.use_solver(useUmfpack=True) x = linalg.spsolve(A, self._b) elif method == 'iterative2': ml = rootnode_solver(A, smooth=('energy', {'degree': 2}), strength='evolution') M = ml.aspreconditioner(cycle='V') # Solve pressure system # x,info = gmres(A, self._b, tol=self._eps, maxiter=1000, M=M) x, info = gmres(A, self._b, tol=10 * self._eps, M=M) if info != 0: print('ERROR in Solving the Matrix') print(info) else: raise ValueError(f'ERROR: Unknown method {method}! Choose either "direct" or "iterative2"!') G = self._G edges = np.array(G.get_edgelist()) G.vs['pressure'] = x self._x = x pressure = np.array(x) conductance = self._conductance G.es['flow'] = abs(pressure[edges[:, 0]] - pressure[edges[:, 1]]) * conductance # Default Units - mmHg for pressure skl = units.scaling_factor_du('mmHg', G['defaultUnits']) G.vs['pressure'] = pressure / skl # Fahraeus effect --> RBC flow velocity larger than bulk flow velocity if self._withRBC: # TODO: test from time import time start = time() e_htd = G.es['htd'] e_htt = G.es['htt'] e_flow = G.es['flow'] e_diam = G.es['diameter'] G.es['v'] = e_htd / e_htt * e_flow / (0.25 * np.pi * e_diam ** 2) end1 = time() d1 = end1 - start G.es['v'] = [e['htd'] / e['htt'] * e['flow'] / (0.25 * np.pi * e['diameter'] ** 2) for e in G.es] d2 = time() - end1 else: G.es['v'] = np.array(G.es['flow']) / (0.25 * np.pi * np.array(G.es['diameter']) ** 2) # Convert 'pBC' from default Units to mmHg pbc_not_none = G.vs(pBC_ne=None).indices G.vs[pbc_not_none]['pBC'] = np.array(G.vs[pbc_not_none]['pBC']) * ( 1 / units.scaling_factor_du('mmHg', G['defaultUnits'])) G.write_pickle('G_final.pkl') # vgm.write_pkl(G, 'G_final.pkl') # vgm.write_vtp(G, 'G_final.vtp',False) # Write Output sample_dict = { 'flow': G.es['flow'], 'v': G.es['v'], 'pressure': G.vs['pressure'] } with open(dest_file_path, 'wb') as fp: pickle.dump(sample_dict, fp, protocol=pickle.HIGHEST_PROTOCOL) return sample_dict
# -------------------------------------------------------------------------- def _update_nominal_and_specific_resistance(self, esequence=None): """Updates the nominal and specific resistance of a given edge sequence. Arguments --------- esequence: Sequence of edge indices as tuple. If not provided, all edges are updated. Returns ------- None, the edge properties 'resistance' and 'specificResistance' are updated (or created). """ G = self._G if esequence is None: es = G.es else: es = G.es(esequence) G.es['specificResistance'] = [128 * self._muPlasma / (np.pi * d ** 4) for d in G.es['diameter']] G.es['resistance'] = [l * sr for l, sr in zip(G.es['length'], G.es['specificResistance'])]