from typing import List, Optional, Tuple
from ..froi import FROIConfig, _get_orthogonalized_froi_data, _get_froi_data
from ..contrast import (
_get_orthogonalized_contrast_data,
_get_contrast_data,
_check_orthogonal,
)
from ..parcels import get_parcels
from ..parcels import is_no_parcels
import pandas as pd
from ..utils import validate_arguments
import numpy as np
import warnings
from .utils import AnalysisSaver
[docs]
class EffectEstimator(AnalysisSaver):
"""
Estimate the effect of a ROI on the data.
:param subjects: List of subject labels.
:type subjects: List[str]
:param froi: fROI configuration to estimate the effect of.
:type froi: FROIConfig
:param fill_na_with_zero: Whether to fill NaN values with zero. If False,
NaN values will be ignored. Default is True.
:type fill_na_with_zero: Optional[bool]
:param orthogonalization: The orthogonalization method. Options are
'all-but-one' and 'odd-even'. Default is 'all-but-one'.
:type orthogonalization: Optional[str]
:param froi_run_label: Label of the run to extract effects for. If not specified,
the method will automatically use orthogonalization when the effects are
not orthogonal to the fROI contrasts, and all runs if the effects are
orthogonal to the fROI contrasts.
:type run_label: Optional[str]
"""
@validate_arguments(orthogonalization={"all-but-one", "odd-even"})
def __init__(
self,
subjects: List[str],
froi: FROIConfig,
fill_na_with_zero: Optional[bool] = True,
orthogonalization: Optional[str] = "all-but-one",
froi_run_label: Optional[str] = None,
):
self.subjects = subjects
self.froi = froi
self.fill_na_with_zero = fill_na_with_zero
self.orthogonalization = orthogonalization
self.froi_run_label = froi_run_label
self._type = "effect"
self._data_summary = None
self._data_detail = None
# Preload the parcel labels
_, self.froi_labels = get_parcels(self.froi.parcels)
self._has_explicit_parcels = not is_no_parcels(self.froi.parcels)
[docs]
def run(
self, task: str, effects: List[str], effect_run_label: Optional[str] = None
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Run the effect estimation. The results are stored in the analysis
output folder.
:param task: Task label.
:type task: str
:param effects: List of effect labels.
:type effects: List[str]
:param effect_run_label: Label of the run to extract effects for. If not specified,
the method will automatically use orthogonalization when the effects are
not orthogonal to the fROI contrasts, and all runs if the effects are
orthogonal to the fROI contrasts.
:type effect_run_label: Optional[str]
:return: The results are returned as a tuple of two dataframes: the
effect estimates averaged across runs, and the effect estimates
detailed by run.
:rtype: Tuple[pd.DataFrame, pd.DataFrame]
"""
self.task = task
self.effects = effects
contrasts = np.array(self.effects)
use_customized_runs = False
if effect_run_label is not None or self.froi_run_label is not None:
if effect_run_label is not None and self.froi_run_label is not None:
use_customized_runs = True
else:
raise ValueError(
"effect_run_label and froi_run_label must both be specified"
)
# Load the data
data_summary = []
data_detail = []
for subject in self.subjects:
if not use_customized_runs:
okorths = np.array(
[
_check_orthogonal(
subject,
self.task,
[contrast],
self.froi.task,
self.froi.contrasts,
)
for contrast in self.effects
]
)
okorth = np.all(okorths)
froi_all = _get_froi_data(subject, self.froi, "all")
if froi_all is None:
warnings.warn(
f"Data not found for subject {subject}, fROI {self.froi}, "
"skipping."
)
continue
froi_all = froi_all[None, :]
if not okorth:
froi_orth, froi_orth_labels = _get_orthogonalized_froi_data(
subject, self.froi, 1, self.orthogonalization
)
if froi_orth is None:
warnings.warn(
f"Data not found for subject {subject}, fROI {self.froi} "
"for the speicial orthogonalization, skipping those "
"non-orthogonal effects."
)
continue
for i, contrast in enumerate(contrasts):
if use_customized_runs:
data_i_effect = _get_contrast_data(
subject, self.task, effect_run_label, contrast, "effect"
)
if data_i_effect is not None:
data_i_effect = data_i_effect[None, :]
data_i_froi = _get_froi_data(subject, self.froi, self.froi_run_label)
if data_i_froi is None:
warnings.warn(
f"Data not found for subject {subject}, fROI {self.froi} "
f"for the run label {self.froi_run_label}, skipping."
)
continue
data_i_froi = data_i_froi[None, :]
effect_run_labels, froi_run_labels = [effect_run_label], [self.froi_run_label]
elif okorth:
data_i_effect = _get_contrast_data(
subject, self.task, "all", contrast, "effect"
)
if data_i_effect is not None:
data_i_effect = data_i_effect[None, :]
data_i_froi = froi_all
effect_run_labels, froi_run_labels = ["all"], ["all"]
else:
(
data_i_effect,
run_label,
) = _get_orthogonalized_contrast_data(
subject,
self.task,
contrast,
2,
"effect",
self.orthogonalization,
)
data_i_froi = froi_orth
effect_run_labels, froi_run_labels = (
run_label,
froi_orth_labels,
)
if data_i_effect is None:
warnings.warn(
f"Data not found for subject {subject}, effect "
f"{contrast}, skipping."
)
continue
df_summary, df_detail = self._run(
data_i_effect, data_i_froi, self.fill_na_with_zero
)
if self.froi_labels is not None:
df_summary["froi"] = df_summary["froi"].apply(
lambda x: self.froi_labels[x]
)
df_detail["froi"] = df_detail["froi"].apply(
lambda x: self.froi_labels[x]
)
elif not self._has_explicit_parcels:
df_summary = df_summary.drop(columns=["froi"])
df_detail = df_detail.drop(columns=["froi"])
df_detail["effect_run"] = df_detail["run"].apply(
lambda x: effect_run_labels[x]
)
df_detail["froi_run"] = df_detail["run"].apply(
lambda x: froi_run_labels[x]
)
df_detail = df_detail.drop(columns=["run"])
df_summary["subject"] = subject
df_detail["subject"] = subject
df_summary["effect"] = contrast
df_detail["effect"] = contrast
data_summary.append(df_summary)
data_detail.append(df_detail)
# Save and return the results
self._data_summary = pd.concat(data_summary)
self._data_detail = pd.concat(data_detail)
new_effects_info = pd.DataFrame(
{
"task": [self.task],
"effects": [self.effects],
"fill_na_with_zero": [self.fill_na_with_zero],
"orthogonalization": [self.orthogonalization],
"froi": [self.froi],
"customized_effect_run": [effect_run_label],
"customized_froi_run": [self.froi_run_label],
}
)
self._save(new_effects_info)
return self._data_summary, self._data_detail
@staticmethod
def _run(
effect_data: np.ndarray,
froi_masks: np.ndarray,
fill_na_with_zero: bool,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Run the effect estimation.
:param effect_data: The effect data, with shape (n_runs, n_voxels).
:type effect_data: np.ndarray
:param froi_masks: The fROI masks, with shape (n_runs, n_voxels).
:type froi_masks: np.ndarray
:param fill_na_with_zero: Whether to fill NaN values with zero. If
False, NaN values will be ignored.
:type fill_na_with_zero: bool
:return: The effect estimates averaged across runs, and the effect
estimates detailed by run.
:rtype: Tuple[pd.DataFrame, pd.DataFrame]
"""
if len(effect_data.shape) != 2:
raise ValueError(
"effect_data should have shape (n_runs, n_voxels)"
)
if len(froi_masks.shape) != 2:
raise ValueError("froi_masks should have shape (n_runs, n_voxels)")
if effect_data.shape != froi_masks.shape:
raise ValueError(
"effect_data and froi_masks should have the same shape"
)
# Reduce the data size
non_nan_voxels = np.any(
~np.isnan(froi_masks) & (froi_masks != 0), axis=0
)
effect_data = effect_data[:, non_nan_voxels]
froi_masks = froi_masks[:, non_nan_voxels]
if fill_na_with_zero:
effect_data[np.isnan(effect_data)] = 0
# Compute the effect size
froi_masks_expanded = froi_masks[None, :, :]
froi_labels = np.unique(froi_masks)
froi_labels = froi_labels[froi_labels != 0 & ~np.isnan(froi_labels)]
froi_masks_expanded = (
froi_masks_expanded == froi_labels[:, None, None]
).astype(float)
froi_masks_expanded[froi_masks_expanded == 0] = np.nan
masked_effect = effect_data[None, :, :] * froi_masks_expanded
effect_size = np.nanmean(masked_effect, axis=(-1))
localizer_size = np.nansum(froi_masks_expanded, axis=(-1))
df_detail = pd.DataFrame(
{
"froi": np.repeat(froi_labels, effect_data.shape[0]),
"run": np.tile(
np.arange(effect_data.shape[0]), len(froi_labels)
),
"n_voxels": localizer_size.flatten(),
"localizer_size": localizer_size.flatten(),
"size": effect_size.flatten(),
}
)
df_summary = (
df_detail.groupby("froi")
.agg({"localizer_size": "mean", "size": "mean"})
.reset_index()
)
return df_summary, df_detail