def t_test_voxelization(group1, group2, signed=False, remove_nan=True, p_cutoff=None):
t-Test on differences between the individual voxels in group1 and group2
group1, group2 : array of arrays
The group of voxelizations to compare.
signed : bool
If True, return also the direction of the changes as +1 or -1.
remove_nan : bool
Remove Nan values from the data.
p_cutoff : None or float
Optional cutoff for the p-values.
p_values : array
The p values for the group wise comparison.
group1 = read_group(group1)
group2 = read_group(group2)
t_vals, p_vals = stats.ttest_ind(group1, group2, axis=0, equal_var=True)
if remove_nan:
p_vals, t_vals = remove_p_val_nans(p_vals, t_vals)
if p_cutoff is not None:
p_vals = np.clip(p_vals, None, p_cutoff)
if signed:
return p_vals, np.sign(t_vals)
return p_vals
def t_test_region_counts(counts1, counts2, signed=False, remove_nan=True, p_cutoff=None, equal_var=False):
"""t-Test on differences in counts of points in labeled regions"""
t_vals, p_vals = stats.ttest_ind(counts1, counts2, axis=1, equal_var=equal_var)
if remove_nan:
p_vals, t_vals = remove_p_val_nans(p_vals, t_vals)
if p_cutoff is not None:
p_vals = np.clip(p_vals, None, p_cutoff)
if signed:
return p_vals, np.sign(t_vals)
return p_vals
# TODO: group sources in IO
def read_group(sources, combine=True, **args):
"""Turn a list of sources for data into a numpy stack.
sources : list of str or sources
The sources to combine.
combine : bool
If true combine the sources to ndarray, oterhwise return a list.
group : array or list
The group data.
if isinstance(sources, np.ndarray):
return sources
# read the individual files
group = []
for f in sources:
data = clearmap_io.as_source(f, **args).array
data = np.reshape(data, (1,) + data.shape)
if combine:
return np.vstack(group)
return group
def weights_from_precentiles(intensities, percentiles=[25, 50, 75, 100]):
perc = np.percentiles(intensities, percentiles)
weights = np.zeros(intensities.shape)
for p in perc:
ii = intensities > p
weights[ii] = weights[ii] + 1
return weights
def __prepare_cumulative_data(data, offset): # FIXME: use better variable names
# fill up the low count data
n = np.array([x.size for x in data])
nm = n.max()
m = np.array([x.max() for x in data])
mm = m.max()
k = n.size
if offset is None:
# assume data starts at 0 !
offset = mm / nm # ideal for all statistics this should be mm + eps to have as little influence as possible.
datac = [x.copy() for x in data]
return datac, k, m, mm, n, nm, offset
def __plot_cumulative_test(datac, m, plot):
# test by plotting
if plot:
import matplotlib.pyplot as plt
for i in range(m.size):
plt.step(datac[i], np.arange(datac[i].size))
def __run_cumulative_test(datac, k, method): # FIXME: remove one letter variable names
if method in ('KolmogorovSmirnov', 'KS'):
if k == 2:
s, p = stats.ks_2samp(datac[0], datac[1])
raise RuntimeError('KolmogorovSmirnov only for 2 samples not %d' % k)
elif method in ('CramervonMises', 'CM'):
if k == 2:
s, p = clearmap_stat_tests.test_cramer_von_mises_2_sample(datac[0], datac[1])
raise RuntimeError('CramervonMises only for 2 samples not %d' % k)
elif method in ('AndersonDarling', 'AD'):
s, a, p = stats.anderson_ksamp(datac)
return p, s
def test_completed_cumulatives(data, method='AndersonDarling', offset=None, plot=False):
"""Test if data sets have the same number / intensity distribution by adding max intensity counts
to the smaller sized data sets and performing a distribution comparison test"""
# idea: fill up data points to the same numbers at the high intensity values and use KS test
# cf. work in progress on thoroughly testing the differences in histograms
datac, k, m, mm, n, nm, offset = __prepare_cumulative_data(data, offset)
for i in range(m.size):
if n[i] < nm:
datac[i] = np.concatenate((datac[i], np.ones(nm-n[i], dtype=datac[i].dtype) * (mm + offset))) # + 10E-5 * numpy.random.rand(nm-n[i])));
__plot_cumulative_test(datac, m, plot)
return __run_cumulative_test(datac, k, method)
def test_completed_inverted_cumulatives(data, method='AndersonDarling', offset=None, plot=False):
"""Test if data sets have the same number / intensity distribution by adding zero intensity counts
to the smaller sized data sets and performing a distribution comparison test on the reversed cumulative distribution"""
# idea: fill up data points to the same numbers at the high intensity values and use KS test
# cf. work in progress on thoroughly testing the differences in histograms
datac, k, m, mm, n, nm, offset = __prepare_cumulative_data(data, offset)
for i in range(m.size):
if n[i] < nm:
datac[i] = np.concatenate((-datac[i], np.ones(nm-n[i], dtype=datac[i].dtype) * (offset))) # + 10E-5 * numpy.random.rand(nm-n[i]))); # FIXME: only different lines with function above
datac[i] = -datac[i]
__plot_cumulative_test(datac, m, plot)
return __run_cumulative_test(datac, k, method)
def remove_p_val_nans(p_vals, t_vals):
invalid_idx = np.isnan(p_vals)
p_vals_c = p_vals.copy()
t_vals_c = t_vals.copy()
p_vals_c[invalid_idx] = 1.0
t_vals_c[invalid_idx] = 0
return p_vals_c, t_vals_c
def stack_voxelizations(directory, f_list, suffix):
Regroup voxelizations to simplify further processing
for i, file_name in enumerate(f_list):
img = clearmap_io.read(make_abs(directory, file_name))
if i == 0: # init on first image
stacked_voxelizations = img[:, :, :, np.newaxis]
stacked_voxelizations = np.concatenate((stacked_voxelizations, img[:, :, :, np.newaxis]), axis=3)
stacked_voxelizations = stacked_voxelizations.astype(np.float32)
clearmap_io.write(os.path.join(directory, f'stacked_density_{suffix}.tif'), stacked_voxelizations, bigtiff=True)
except ValueError:
return stacked_voxelizations
def average_voxelization_groups(stacked_voxelizations, directory, suffix, compute_sd=False):
avg_voxelization = np.mean(stacked_voxelizations, axis=3)
clearmap_io.write(os.path.join(directory, f'avg_density_{suffix}.tif'), avg_voxelization)
if compute_sd:
sd_voxelization = np.std(stacked_voxelizations, axis=3)
clearmap_io.write(os.path.join(directory, f'sd_density_{suffix}.tif'), sd_voxelization)
def __validate_colors(positive_color, negative_color):
if len(positive_color) != len(negative_color):
raise ValueError(f'Length of positive and negative colors do not match, '
f'got {len(positive_color)} and {len(negative_color)}')
def color_p_values(p_vals, p_sign, positive_color=[1, 0], negative_color=[0, 1], p_cutoff=None,
positive_trend=[0, 0, 1, 0], negative_trend=[0, 0, 0, 1], p_max=None):
p_vals np.array:
p_sign np.array:
positive list:
negative list:
p_cutoff float:
positive_trend list:
negative_trend list:
p_max float:
if p_max is None:
p_max = p_vals.max()
p_vals_inv = p_max - p_vals
if p_cutoff is None: # color given p values
__validate_colors(positive_color, negative_color)
# 3D + color output array
d = len(positive_color) # 3D
output_shape = p_vals.shape + (d,) # 3D + color
colored_p_vals = np.zeros(output_shape)
# coloring
for neg, col in ((False, positive_color), (True, negative_color)):
if neg:
ids = p_sign < 0
ids = p_sign > 0
p_vals_i = p_vals_inv[ids]
for i, channel_value in enumerate(col): # [i] on R, G, B components
colored_p_vals[ids, i] = p_vals_i * channel_value
# REFACTOR: move to visualisation module
def get_colored_p_vals(p_vals, t_vals, significance, color_names):
p_vals_f = np.clip(p_vals, None, significance)
p_sign = np.sign(t_vals)
return color_p_values(p_vals_f, p_sign,
def dirs_to_density_files(directory, f_list):
out = []
for i, f_name in enumerate(f_list):
f_name = make_abs(directory, f_name)
if not is_density_file(f_name):
f_name = find_density_file(f_name)
return out
def group_cells_counts(struct_ids, group_cells_dfs, sample_ids, volume_map):
struct_ids list:
group_cells_dfs: list(pd.DataFrame)
sample_ids: list
volume_map: dict
maps each id from structure_ids to the corresponding structure's volume (in pixel)
all_ints = False
if all_ints:
output = pd.DataFrame(columns=['id', 'hemisphere'] + [f'counts_{str(sample_ids[i]).zfill(2)}' for i in range(len(group_cells_dfs))])
output = pd.DataFrame(columns=['id', 'hemisphere'] + [f'counts_{sample_ids[i]}' for i in range(len(group_cells_dfs))])
output['id'] = np.tile(struct_ids, 2) # for each hemisphere
output['name'] = np.tile([annotation.find(id_, key='id')['name'] for id_ in struct_ids], 2)
output['hemisphere'] = np.repeat((0, 255), len(struct_ids)) # FIXME: translate hemisphere to plain text
output['volume'] = output.set_index(['id', 'hemisphere']).index.map(volume_map.get)
output = output[output['volume'].notna()]
for multiplier, hem_id in zip((1, 2), (0, 255)):
for j, sample_df in enumerate(group_cells_dfs):
if all_ints:
col_name = f'counts_{str(sample_ids[j]).zfill(2)}' # TODO: option with f'counts_{j}'
col_name = f'counts_{sample_ids[j]}'
hem_sample_df = sample_df[sample_df['hemisphere'] == hem_id]
# FIXME: replace loop (slow)
for i, struct_id in enumerate(struct_ids):
row_idx = output[(output['id'] == struct_id) & (output['hemisphere'] == hem_id)].index
output.loc[row_idx, col_name] = len(hem_sample_df[hem_sample_df['id'] == struct_id])
return output
def generate_summary_table(cells_dfs, p_cutoff=None):
gp_names = list(cells_dfs.keys())
grouped_counts = []
total_df = pd.DataFrame({k: cells_dfs[gp_names[0]][k] for k in ('id', 'name', 'volume', 'hemisphere')})
for i, gp_name in enumerate(gp_names):
for col_name in cells_dfs[gp_name].columns:
if 'count' in col_name:
col = cells_dfs[gp_name][col_name]
new_col_name = f'{gp_names[i]}_{col_name}'
total_df[new_col_name] = col
grouped_counts[i][new_col_name] = col
total_df[f'mean_{gp_name}'] = grouped_counts[i].mean(axis=1).astype(float) # To avoid "object" type
total_df[f'sd_{gp_name}'] = grouped_counts[i].std(axis=1).astype(float) # To avoid "object" type
total_df, grouped_counts = sanitize_df(gp_names, grouped_counts, total_df)
gp1 = grouped_counts[0].values.astype(int)
gp2 = grouped_counts[1].values.astype(int)
p_vals, p_signs = t_test_region_counts(gp1, gp2, p_cutoff=p_cutoff, signed=True)
total_df['p_value'] = p_vals
total_df['q_value'] = clearmap_FDR.estimate_q_values(p_vals)
total_df['p_sign'] = p_signs.astype(int)
return total_df
def sanitize_df(gp_names, grouped_counts, total_df):
Remove rows with all 0 or NaN in at least 1 group
bad_idx = total_df[f'mean_{gp_names[0]}'] == 0 # FIXME: check that either not and
bad_idx = np.logical_or(bad_idx, total_df[f'mean_{gp_names[1]}'] == 0)
bad_idx = np.logical_or(bad_idx, np.isnan(total_df[f'mean_{gp_names[0]}']))
bad_idx = np.logical_or(bad_idx, np.isnan(total_df[f'mean_{gp_names[1]}']))
return total_df[~bad_idx], [grouped_counts[0][~bad_idx], grouped_counts[1][~bad_idx]]
def dirs_to_cells_dfs(directory, dirs):
out = []
for i, f_name in enumerate(dirs):
f_name = make_abs(directory, f_name)
if not f_name.endswith('cells.feather'):
f_name = find_cells_df(f_name)
return out
def get_volume_map(folder):
preproc = init_preprocessor(folder)
return annotation.annotation.get_lateralised_volume_map(
def make_summary(directory, gp1_name, gp2_name, gp1_dirs, gp2_dirs, output_path=None, save=True):
gp1_dfs = dirs_to_cells_dfs(directory, gp1_dirs)
gp2_dfs = dirs_to_cells_dfs(directory, gp2_dirs)
gp_cells_dfs = [gp1_dfs, gp2_dfs]
structs = get_all_structs(gp1_dfs + gp2_dfs)
gp1_sample_ids = [dir_to_sample_id(folder) for folder in gp1_dirs]
gp2_sample_ids = [dir_to_sample_id(folder) for folder in gp2_dirs]
sample_ids = [gp1_sample_ids, gp2_sample_ids]
volume_map = get_volume_map(gp1_dirs[0]) # WARNING Hacky
aggregated_dfs = {gp_name: group_cells_counts(structs, gp_cells_dfs[i], sample_ids[i], volume_map)
for i, gp_name in enumerate((gp1_name, gp2_name))}
total_df = generate_summary_table(aggregated_dfs)
if output_path is None and save:
output_path = os.path.join(directory, f'statistics_{gp1_name}_{gp2_name}.csv')
if save:
return total_df
def density_files_are_comparable(directory, gp1_dirs, gp2_dirs):
gp1_f_list = dirs_to_density_files(directory, gp1_dirs)
gp2_f_list = dirs_to_density_files(directory, gp2_dirs)
all_files = gp1_f_list + gp2_f_list
sizes = [os.path.getsize(f) for f in all_files]
tolerance = 1024 # 1 KB
comparable = all(math.isclose(s, sizes[0], abs_tol=tolerance) for s in sizes)
if comparable:
return True
raise GroupStatsError(f'Could not compare files, sizes differ\n\n'
f'Group 1: {gp1_f_list}\n'
f'Group 2: {gp2_f_list}\n'
f'Sizes 1: {[os.path.getsize(f) for f in gp1_f_list]}\n'
f'Sizes 2: {[os.path.getsize(f) for f in gp2_f_list]}\n')
def compare_groups(directory, gp1_name, gp2_name, gp1_dirs, gp2_dirs, prefix='p_val_colors', advanced=True):
gp1_f_list = dirs_to_density_files(directory, gp1_dirs)
gp2_f_list = dirs_to_density_files(directory, gp2_dirs)
gp1_stacked_voxelizations = stack_voxelizations(directory, gp1_f_list, suffix=gp1_name)
average_voxelization_groups(gp1_stacked_voxelizations, directory, gp1_name, compute_sd=advanced)
gp2_stacked_voxelizations = stack_voxelizations(directory, gp2_f_list, suffix=gp2_name)
average_voxelization_groups(gp2_stacked_voxelizations, directory, gp2_name, compute_sd=advanced)
t_vals, p_vals = stats.ttest_ind(gp1_stacked_voxelizations, gp2_stacked_voxelizations, axis=3, equal_var=False)
p_vals, t_vals = remove_p_val_nans(p_vals, t_vals)
colored_p_vals_05 = get_colored_p_vals(p_vals, t_vals, 0.05, ('red', 'green'))
colored_p_vals_01 = get_colored_p_vals(p_vals, t_vals, 0.01, ('green', 'blue'))
colored_p_vals = np.maximum(colored_p_vals_05, colored_p_vals_01).astype(np.uint8)
output_f_name = f'{prefix}_{gp1_name}_{gp2_name}.tif'
output_file_path = os.path.join(directory, output_f_name)
clearmap_io.write(output_file_path, colored_p_vals, photometric='rgb', imagej=True)
if advanced:
effect_size = np.abs(np.mean(gp1_stacked_voxelizations, axis=3).astype(int) -
np.mean(gp2_stacked_voxelizations, axis=3).astype(int))
effect_size = effect_size.astype(np.uint16) # for imagej compatibility
output_f_name = f'effect_size_{gp1_name}_{gp2_name}.tif'
output_file_path = os.path.join(directory, output_f_name)
clearmap_io.write(output_file_path, effect_size, imagej=True)
return colored_p_vals
