Source code for ClearMap.Analysis.Statistics.group_statistics

# -*- coding: utf-8 -*-
"""
Statistics
==========

Create some statistics to test significant changes in voxelized and labeled 
data.
"""
__author__ = 'Christoph Kirst <christoph.kirst.ck@gmail.com>, Sophie Skriabine <sophie.skriabine@icm-institute.org>, Charly Rousseau <charly.rousseau@icm-institute.org>'
__license__ = 'GPLv3 - GNU General Public License v3 (see LICENSE.txt)'
__copyright__ = 'Copyright © 2020 by Christoph Kirst'
__webpage__ = 'https://idisco.info'
__download__ = 'https://www.github.com/ChristophKirst/ClearMap2'

import math
import os

import numpy as np
import pandas as pd
import tifffile
from scipy import stats

import ClearMap.Analysis.Statistics.StatisticalTests as clearmap_stat_tests
from ClearMap.Alignment import Annotation as annotation
from ClearMap.Alignment.utils import get_all_structs
from ClearMap.Analysis.Statistics import MultipleComparisonCorrection as clearmap_FDR
from ClearMap.IO import IO as clearmap_io
from ClearMap.Utils.exceptions import GroupStatsError
from ClearMap.Utils.path_utils import is_density_file, find_density_file, find_cells_df, dir_to_sample_id
from ClearMap.Utils.utilities import make_abs
from ClearMap.processors.sample_preparation import init_preprocessor

colors = {  # REFACTOR: move to visualisation module
    'red': [255, 0, 0],
    'green': [0, 255, 0],
    'blue': [0, 0, 255]
}


[docs] def t_test_voxelization(group1, group2, signed=False, remove_nan=True, p_cutoff=None): """ t-Test on differences between the individual voxels in group1 and group2 Arguments --------- group1, group2 : array of arrays The group of voxelizations to compare. signed : bool If True, return also the direction of the changes as +1 or -1. remove_nan : bool Remove Nan values from the data. p_cutoff : None or float Optional cutoff for the p-values. Returns ------- p_values : array The p values for the group wise comparison. """ group1 = read_group(group1) group2 = read_group(group2) t_vals, p_vals = stats.ttest_ind(group1, group2, axis=0, equal_var=True) if remove_nan: p_vals, t_vals = remove_p_val_nans(p_vals, t_vals) if p_cutoff is not None: p_vals = np.clip(p_vals, None, p_cutoff) if signed: return p_vals, np.sign(t_vals) else: return p_vals
# WARNING: needs clean up
[docs] def t_test_region_counts(counts1, counts2, signed=False, remove_nan=True, p_cutoff=None, equal_var=False): """t-Test on differences in counts of points in labeled regions""" # ids, p1 = countPointsGroupInRegions(pointGroup1, labeledImage = labeledImage, withIds = True); # p2 = countPointsGroupInRegions(pointGroup2, labeledImage = labeledImage, withIds = False); t_vals, p_vals = stats.ttest_ind(counts1, counts2, axis=1, equal_var=equal_var) if remove_nan: p_vals, t_vals = remove_p_val_nans(p_vals, t_vals) if p_cutoff is not None: p_vals = np.clip(p_vals, None, p_cutoff) # p_vals.shape = (1,) + p_vals.shape; # ids.shape = (1,) + ids.shape; # p_vals = numpy.concatenate((ids.T, p_vals.T), axis = 1); if signed: return p_vals, np.sign(t_vals) else: return p_vals
# TODO: group sources in IO
[docs] def read_group(sources, combine=True, **args): """Turn a list of sources for data into a numpy stack. Arguments --------- sources : list of str or sources The sources to combine. combine : bool If true combine the sources to ndarray, oterhwise return a list. Returns ------- group : array or list The group data. """ # check if stack already: if isinstance(sources, np.ndarray): return sources # read the individual files group = [] for f in sources: data = clearmap_io.as_source(f, **args).array data = np.reshape(data, (1,) + data.shape) group.append(data) if combine: return np.vstack(group) else: return group
[docs] def weights_from_precentiles(intensities, percentiles=[25, 50, 75, 100]): perc = np.percentiles(intensities, percentiles) weights = np.zeros(intensities.shape) for p in perc: ii = intensities > p weights[ii] = weights[ii] + 1 return weights
def __prepare_cumulative_data(data, offset): # FIXME: use better variable names # fill up the low count data n = np.array([x.size for x in data]) nm = n.max() m = np.array([x.max() for x in data]) mm = m.max() k = n.size # print nm, mm, k if offset is None: # assume data starts at 0 ! offset = mm / nm # ideal for all statistics this should be mm + eps to have as little influence as possible. datac = [x.copy() for x in data] return datac, k, m, mm, n, nm, offset def __plot_cumulative_test(datac, m, plot): # test by plotting if plot: import matplotlib.pyplot as plt for i in range(m.size): datac[i].sort() plt.step(datac[i], np.arange(datac[i].size)) def __run_cumulative_test(datac, k, method): # FIXME: remove one letter variable names if method in ('KolmogorovSmirnov', 'KS'): if k == 2: s, p = stats.ks_2samp(datac[0], datac[1]) else: raise RuntimeError('KolmogorovSmirnov only for 2 samples not %d' % k) elif method in ('CramervonMises', 'CM'): if k == 2: s, p = clearmap_stat_tests.test_cramer_von_mises_2_sample(datac[0], datac[1]) else: raise RuntimeError('CramervonMises only for 2 samples not %d' % k) elif method in ('AndersonDarling', 'AD'): s, a, p = stats.anderson_ksamp(datac) return p, s
[docs] def test_completed_cumulatives(data, method='AndersonDarling', offset=None, plot=False): """Test if data sets have the same number / intensity distribution by adding max intensity counts to the smaller sized data sets and performing a distribution comparison test""" # idea: fill up data points to the same numbers at the high intensity values and use KS test # cf. work in progress on thoroughly testing the differences in histograms datac, k, m, mm, n, nm, offset = __prepare_cumulative_data(data, offset) for i in range(m.size): if n[i] < nm: datac[i] = np.concatenate((datac[i], np.ones(nm-n[i], dtype=datac[i].dtype) * (mm + offset))) # + 10E-5 * numpy.random.rand(nm-n[i]))); __plot_cumulative_test(datac, m, plot) return __run_cumulative_test(datac, k, method)
[docs] def test_completed_inverted_cumulatives(data, method='AndersonDarling', offset=None, plot=False): """Test if data sets have the same number / intensity distribution by adding zero intensity counts to the smaller sized data sets and performing a distribution comparison test on the reversed cumulative distribution""" # idea: fill up data points to the same numbers at the high intensity values and use KS test # cf. work in progress on thoroughly testing the differences in histograms datac, k, m, mm, n, nm, offset = __prepare_cumulative_data(data, offset) for i in range(m.size): if n[i] < nm: datac[i] = np.concatenate((-datac[i], np.ones(nm-n[i], dtype=datac[i].dtype) * (offset))) # + 10E-5 * numpy.random.rand(nm-n[i]))); # FIXME: only different lines with function above else: datac[i] = -datac[i] __plot_cumulative_test(datac, m, plot) return __run_cumulative_test(datac, k, method)
[docs] def remove_p_val_nans(p_vals, t_vals): invalid_idx = np.isnan(p_vals) p_vals_c = p_vals.copy() t_vals_c = t_vals.copy() p_vals_c[invalid_idx] = 1.0 t_vals_c[invalid_idx] = 0 return p_vals_c, t_vals_c
[docs] def stack_voxelizations(directory, f_list, suffix): """ Regroup voxelizations to simplify further processing Parameters ---------- directory f_list suffix Returns ------- """ for i, file_name in enumerate(f_list): img = clearmap_io.read(make_abs(directory, file_name)) if i == 0: # init on first image stacked_voxelizations = img[:, :, :, np.newaxis] else: stacked_voxelizations = np.concatenate((stacked_voxelizations, img[:, :, :, np.newaxis]), axis=3) stacked_voxelizations = stacked_voxelizations.astype(np.float32) try: clearmap_io.write(os.path.join(directory, f'stacked_density_{suffix}.tif'), stacked_voxelizations, bigtiff=True) except ValueError: pass return stacked_voxelizations
[docs] def average_voxelization_groups(stacked_voxelizations, directory, suffix, compute_sd=False): avg_voxelization = np.mean(stacked_voxelizations, axis=3) clearmap_io.write(os.path.join(directory, f'avg_density_{suffix}.tif'), avg_voxelization) if compute_sd: sd_voxelization = np.std(stacked_voxelizations, axis=3) clearmap_io.write(os.path.join(directory, f'sd_density_{suffix}.tif'), sd_voxelization)
# REFACTOR: move to visualisation module def __validate_colors(positive_color, negative_color): if len(positive_color) != len(negative_color): raise ValueError(f'Length of positive and negative colors do not match, ' f'got {len(positive_color)} and {len(negative_color)}') # REFACTOR: move to visualisation module
[docs] def color_p_values(p_vals, p_sign, positive_color=[1, 0], negative_color=[0, 1], p_cutoff=None, positive_trend=[0, 0, 1, 0], negative_trend=[0, 0, 0, 1], p_max=None): """ Parameters ---------- p_vals np.array: p_sign np.array: positive list: negative list: p_cutoff float: positive_trend list: negative_trend list: p_max float: Returns ------- """ if p_max is None: p_max = p_vals.max() p_vals_inv = p_max - p_vals if p_cutoff is None: # color given p values __validate_colors(positive_color, negative_color) # 3D + color output array d = len(positive_color) # 3D output_shape = p_vals.shape + (d,) # 3D + color colored_p_vals = np.zeros(output_shape) # coloring for neg, col in ((False, positive_color), (True, negative_color)): if neg: ids = p_sign < 0 else: ids = p_sign > 0 p_vals_i = p_vals_inv[ids] for i, channel_value in enumerate(col): # [i] on R, G, B components colored_p_vals[ids, i] = p_vals_i * channel_value # else: # split p_values according to cutoff # if any([len(positive_color) != len(v) for v in (negative_color, positive_trend, negative_trend)]): # raise ValueError('color_p_values: positive, negative, positive_trend and ' # 'negative_trend option must be equal length !') # output_shape = p_vals.shape + (len(positive_color),) # colored_p_vals = np.zeros(output_shape) # # idc = p_vals < p_cutoff # ids = p_sign > 0 # # significant positive, non sig positive, sig neg, non sig neg # for id_sign, idc_sign, w in ((1, 1, positive_color), (1, -1, positive_trend), # (-1, 1, negative_color), (-1, -1, negative_trend)): # if id_sign < 0: # ids = np.logical_not(ids) # if idc_sign < 0: # idc = np.logical_not(idc) # ii = np.logical_and(ids, idc) # p_vals_i = p_vals_inv[ii] # for i in range(len(w)): # colored_p_vals[ii, i] = p_vals_i * w[i] return colored_p_vals
# REFACTOR: move to visualisation module
[docs] def get_colored_p_vals(p_vals, t_vals, significance, color_names): p_vals_f = np.clip(p_vals, None, significance) p_sign = np.sign(t_vals) return color_p_values(p_vals_f, p_sign, positive_color=colors[color_names[0]], negative_color=colors[color_names[1]])
[docs] def dirs_to_density_files(directory, f_list): out = [] for i, f_name in enumerate(f_list): f_name = make_abs(directory, f_name) if not is_density_file(f_name): f_name = find_density_file(f_name) out.append(f_name) return out
# def get_p_vals_f(p_vals, t_vals, p_cutoff): # p_vals2 = np.clip(p_vals, None, p_cutoff) # p_sign = np.sign(t_vals) # return p_vals2, p_sign
[docs] def group_cells_counts(struct_ids, group_cells_dfs, sample_ids, volume_map): """ Parameters ---------- struct_ids list: group_cells_dfs: list(pd.DataFrame) sample_ids: list volume_map: dict maps each id from structure_ids to the corresponding structure's volume (in pixel) Returns ------- """ all_ints = False if all_ints: output = pd.DataFrame(columns=['id', 'hemisphere'] + [f'counts_{str(sample_ids[i]).zfill(2)}' for i in range(len(group_cells_dfs))]) else: output = pd.DataFrame(columns=['id', 'hemisphere'] + [f'counts_{sample_ids[i]}' for i in range(len(group_cells_dfs))]) output['id'] = np.tile(struct_ids, 2) # for each hemisphere output['name'] = np.tile([annotation.find(id_, key='id')['name'] for id_ in struct_ids], 2) output['hemisphere'] = np.repeat((0, 255), len(struct_ids)) # FIXME: translate hemisphere to plain text output['volume'] = output.set_index(['id', 'hemisphere']).index.map(volume_map.get) output = output[output['volume'].notna()] for multiplier, hem_id in zip((1, 2), (0, 255)): for j, sample_df in enumerate(group_cells_dfs): if all_ints: col_name = f'counts_{str(sample_ids[j]).zfill(2)}' # TODO: option with f'counts_{j}' else: col_name = f'counts_{sample_ids[j]}' hem_sample_df = sample_df[sample_df['hemisphere'] == hem_id] # FIXME: replace loop (slow) for i, struct_id in enumerate(struct_ids): row_idx = output[(output['id'] == struct_id) & (output['hemisphere'] == hem_id)].index output.loc[row_idx, col_name] = len(hem_sample_df[hem_sample_df['id'] == struct_id]) return output
[docs] def generate_summary_table(cells_dfs, p_cutoff=None): gp_names = list(cells_dfs.keys()) grouped_counts = [] total_df = pd.DataFrame({k: cells_dfs[gp_names[0]][k] for k in ('id', 'name', 'volume', 'hemisphere')}) for i, gp_name in enumerate(gp_names): grouped_counts.append(pd.DataFrame()) for col_name in cells_dfs[gp_name].columns: if 'count' in col_name: col = cells_dfs[gp_name][col_name] new_col_name = f'{gp_names[i]}_{col_name}' total_df[new_col_name] = col grouped_counts[i][new_col_name] = col total_df[f'mean_{gp_name}'] = grouped_counts[i].mean(axis=1).astype(float) # To avoid "object" type total_df[f'sd_{gp_name}'] = grouped_counts[i].std(axis=1).astype(float) # To avoid "object" type total_df, grouped_counts = sanitize_df(gp_names, grouped_counts, total_df) gp1 = grouped_counts[0].values.astype(int) gp2 = grouped_counts[1].values.astype(int) p_vals, p_signs = t_test_region_counts(gp1, gp2, p_cutoff=p_cutoff, signed=True) total_df['p_value'] = p_vals total_df['q_value'] = clearmap_FDR.estimate_q_values(p_vals) total_df['p_sign'] = p_signs.astype(int) return total_df
[docs] def sanitize_df(gp_names, grouped_counts, total_df): """ Remove rows with all 0 or NaN in at least 1 group Args: gp_names: grouped_counts: total_df: Returns: """ bad_idx = total_df[f'mean_{gp_names[0]}'] == 0 # FIXME: check that either not and bad_idx = np.logical_or(bad_idx, total_df[f'mean_{gp_names[1]}'] == 0) bad_idx = np.logical_or(bad_idx, np.isnan(total_df[f'mean_{gp_names[0]}'])) bad_idx = np.logical_or(bad_idx, np.isnan(total_df[f'mean_{gp_names[1]}'])) return total_df[~bad_idx], [grouped_counts[0][~bad_idx], grouped_counts[1][~bad_idx]]
[docs] def dirs_to_cells_dfs(directory, dirs): out = [] for i, f_name in enumerate(dirs): f_name = make_abs(directory, f_name) if not f_name.endswith('cells.feather'): f_name = find_cells_df(f_name) out.append(pd.read_feather(f_name)) return out
[docs] def get_volume_map(folder): preproc = init_preprocessor(folder) return annotation.annotation.get_lateralised_volume_map( preproc.processing_config['registration']['resampling']['autofluo_sink_resolution'], preproc.hemispheres_file_path )
# REFACTOR: move to separate module
[docs] def make_summary(directory, gp1_name, gp2_name, gp1_dirs, gp2_dirs, output_path=None, save=True): gp1_dfs = dirs_to_cells_dfs(directory, gp1_dirs) gp2_dfs = dirs_to_cells_dfs(directory, gp2_dirs) gp_cells_dfs = [gp1_dfs, gp2_dfs] structs = get_all_structs(gp1_dfs + gp2_dfs) gp1_sample_ids = [dir_to_sample_id(folder) for folder in gp1_dirs] gp2_sample_ids = [dir_to_sample_id(folder) for folder in gp2_dirs] sample_ids = [gp1_sample_ids, gp2_sample_ids] volume_map = get_volume_map(gp1_dirs[0]) # WARNING Hacky aggregated_dfs = {gp_name: group_cells_counts(structs, gp_cells_dfs[i], sample_ids[i], volume_map) for i, gp_name in enumerate((gp1_name, gp2_name))} total_df = generate_summary_table(aggregated_dfs) if output_path is None and save: output_path = os.path.join(directory, f'statistics_{gp1_name}_{gp2_name}.csv') if save: total_df.to_csv(output_path) return total_df
[docs] def density_files_are_comparable(directory, gp1_dirs, gp2_dirs): gp1_f_list = dirs_to_density_files(directory, gp1_dirs) gp2_f_list = dirs_to_density_files(directory, gp2_dirs) all_files = gp1_f_list + gp2_f_list sizes = [os.path.getsize(f) for f in all_files] tolerance = 1024 # 1 KB comparable = all(math.isclose(s, sizes[0], abs_tol=tolerance) for s in sizes) if comparable: return True else: raise GroupStatsError(f'Could not compare files, sizes differ\n\n' f'Group 1: {gp1_f_list}\n' f'Group 2: {gp2_f_list}\n' f'Sizes 1: {[os.path.getsize(f) for f in gp1_f_list]}\n' f'Sizes 2: {[os.path.getsize(f) for f in gp2_f_list]}\n')
# REFACTOR: move to separate module
[docs] def compare_groups(directory, gp1_name, gp2_name, gp1_dirs, gp2_dirs, prefix='p_val_colors', advanced=True): gp1_f_list = dirs_to_density_files(directory, gp1_dirs) gp2_f_list = dirs_to_density_files(directory, gp2_dirs) gp1_stacked_voxelizations = stack_voxelizations(directory, gp1_f_list, suffix=gp1_name) average_voxelization_groups(gp1_stacked_voxelizations, directory, gp1_name, compute_sd=advanced) gp2_stacked_voxelizations = stack_voxelizations(directory, gp2_f_list, suffix=gp2_name) average_voxelization_groups(gp2_stacked_voxelizations, directory, gp2_name, compute_sd=advanced) t_vals, p_vals = stats.ttest_ind(gp1_stacked_voxelizations, gp2_stacked_voxelizations, axis=3, equal_var=False) p_vals, t_vals = remove_p_val_nans(p_vals, t_vals) colored_p_vals_05 = get_colored_p_vals(p_vals, t_vals, 0.05, ('red', 'green')) colored_p_vals_01 = get_colored_p_vals(p_vals, t_vals, 0.01, ('green', 'blue')) colored_p_vals = np.maximum(colored_p_vals_05, colored_p_vals_01).astype(np.uint8) output_f_name = f'{prefix}_{gp1_name}_{gp2_name}.tif' output_file_path = os.path.join(directory, output_f_name) clearmap_io.write(output_file_path, colored_p_vals, photometric='rgb', imagej=True) if advanced: effect_size = np.abs(np.mean(gp1_stacked_voxelizations, axis=3).astype(int) - np.mean(gp2_stacked_voxelizations, axis=3).astype(int)) effect_size = effect_size.astype(np.uint16) # for imagej compatibility output_f_name = f'effect_size_{gp1_name}_{gp2_name}.tif' output_file_path = os.path.join(directory, output_f_name) clearmap_io.write(output_file_path, effect_size, imagej=True) return colored_p_vals
# def test_completed_cumulatives_in_spheres(points1, intensities1, points2, intensities2, # shape=ano.default_annotation_file, radius = 100, method = 'AndresonDarling'): # """Performs completed cumulative distribution tests for each pixel using points in a ball # centered at that cooridnates, returns 4 arrays p value, statistic value, number in each group""" # # # TODO: simple implementation -> slow -> speed up # if not isinstance(shape, tuple): # shape = io.shape(shape) # if len(shape) != 3: # raise RuntimeError('Shape expected to be 3d, found %r' % (shape,)) # # # distances^2 to origin # x1= points1[:,0]; y1 = points1[:,1]; z1 = points1[:,2]; i1 = intensities1 # d1 = x1 * x1 + y1 * y1 + z1 * z1 # # x2 = points2[:,0]; y2 = points2[:,1]; z2 = points2[:,2]; i2 = intensities2 # d2 = x2 * x2 + y2 * y2 + z2 * z2 # # r2 = radius * radius # WARNING: inhomogenous in 3d ! # # p = np.zeros(dataSize) # s = np.zeros(dataSize) # n1 = np.zeros(dataSize, dtype='int') # n2 = np.zeros(dataSize, dtype='int') # # for x in range(dataSize[0]): # #print x # for y in range(dataSize[1]): # #print y # for z in range(dataSize[2]): # #print z # d11 = d1 - 2 * (x * x1 + y * y1 + z * z1) + (x*x + y*y + z*z) # d22 = d2 - 2 * (x * x2 + y * y2 + z * z2) + (x*x + y*y + z*z) # # ii1 = d11 < r2 # ii2 = d22 < r2 # # n1[x,y,z] = ii1.sum() # n2[x,y,z] = ii2.sum() # # if n1[x,y,z] > 0 and n2[x,y,z] > 0: # (pp, ss) = self.testCompletedCumulatives((i1[ii1], i2[ii2]), method = method) # else: # pp = 0; ss = 0 # # p[x,y,z] = pp # s[x,y,z] = ss # # return (p,s,n1,n2) # # # ############################################################################### # ### Tests # ############################################################################### # # def _test(): # """Test the statistics array""" # import numpy as np # import ClearMap.Analysis.Statistics.GroupStatistics as st # # s = np.ones((5,4,20)) # s[:, 0:3, :] = - 1 # # x = np.random.rand(4,4,20) # y = np.random.rand(5,4,20) + s # # pvals, psign = st.t_test_voxelization(x,y, signed = True) # # pvalscol = st.color_p_values(pvals, psign, positive = [255,0,0], negative = [0,255,0]) # # import ClearMap.Visualization.Plot3d as p3d # p3d.plot(pvalscol)