from pathlib import Path
from typing import Dict, List, Optional, Union, Tuple
import hashlib
import json
import re
import warnings
import numpy as np
import pandas as pd
from nilearn import glm, image, maskers, signal
from nilearn.image import load_img
from nilearn.interfaces.bids.query import get_bids_files
from nilearn.surface import SurfaceImage
from .._surface import (
SURFACE_HEMIS,
SURFACE_PARTS,
flatten_image_data,
is_surface_image,
load_surface_image,
save_surface_image,
)
from ..contrast import _get_run_group_info
from ..first_level.nilearn import _find_surface_mesh_paths, _smooth_surface_array
from ..froi import FROIConfig, _get_froi_data
from ..parcels import ParcelsConfig, get_parcels, is_no_parcels
from ..settings import (
get_bids_data_folder,
get_bids_deriv_folder,
get_bids_preprocessed_folder,
)
from .utils import AnalysisSaver
FC_CLEAN_CONFIG = {
"clean_surf": False,
"mask_suffix": "_desc-brain_mask.nii.gz",
# Match the CLiMB-style postfprep volume cleaner, which does not smooth the
# mask in its active code path.
"mask_fwhm": None,
"target_affine": None,
"confounds_regex": (
r"^(global_signal|framewise_displacement|trans_[xyz]|rot_[xyz]|"
r"a_comp_cor_0[0-4]|cosine.*)$"
),
"fd_threshold": 0.5,
"volume_fwhm": 4,
"surface_fwhm": 4,
"standardize": True,
"detrend": True,
"regress_out_task": True,
"task_conditions": None,
"regress_task_conditions": None,
"concat_conditions_across_runs": False,
"low_pass": 0.1,
"high_pass": 0.01,
"min_T": 50,
}
RUN_RE = re.compile(r"_run-([A-Za-z0-9]+)_")
SPACE_RE = re.compile(r"_space-([A-Za-z0-9]+)_")
SES_RE = re.compile(r"_ses-([A-Za-z0-9]+)_")
ENTITY_RE = re.compile(r"(^|_)([A-Za-z0-9]+)-([^_]+)")
FC_PREPROC_DERIV_NAME = "fconn_preproc"
FC_PREPROC_CONFIG_KEYS = (
"clean_surf",
"mask_suffix",
"mask_fwhm",
"target_affine",
"confounds_regex",
"fd_threshold",
"volume_fwhm",
"surface_fwhm",
"standardize",
"detrend",
"regress_out_task",
"regress_task_conditions",
"low_pass",
"high_pass",
)
def _normalize_standardize_arg(standardize):
if standardize is True:
return "zscore_sample"
if standardize is False:
return None
return standardize
def _masker_fit_transform_with_explicit_mask(masker, func_img, confounds):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=(
r"\[NiftiMasker\.fit\] Generation of a mask has been requested "
r"\(imgs != None\) while a mask was given at masker creation\. "
r"Given mask will be used\."
),
category=UserWarning,
)
return masker.fit_transform(func_img, confounds=confounds)
def _normalize_task_conditions(
task_conditions: Optional[Union[str, List[str], Tuple[str, ...]]]
) -> Optional[List[str]]:
if task_conditions is None:
return None
if isinstance(task_conditions, str):
return [task_conditions]
return list(task_conditions)
def _normalize_run_filter_token(run_filter_token) -> str:
run_filter_str = str(run_filter_token)
if run_filter_str.isdigit():
return f"{int(run_filter_str):02d}"
run_filter_lower = run_filter_str.lower()
if run_filter_lower in {"all", "odd", "even"}:
return run_filter_lower
if run_filter_lower.startswith("orth") and run_filter_lower[4:].isdigit():
return f"orth{int(run_filter_lower[4:]):02d}"
return run_filter_str
def _normalize_run_filter(
run_filter: Optional[Union[str, int, List[Union[str, int]], Tuple[Union[str, int], ...]]]
):
if run_filter is None:
return None
if isinstance(run_filter, (list, tuple, set)):
return [_normalize_run_filter_token(run_i) for run_i in run_filter]
return _normalize_run_filter_token(run_filter)
def _frame_times_from_sidecar(
sidecar: Dict,
TR: float,
start_time: float,
n_timepoints: int,
) -> np.ndarray:
slice_time_ref = sidecar.get("SliceTimingCorrected")
if slice_time_ref is True:
slice_time_ref = 0.5
elif slice_time_ref is None:
slice_time_ref = 0.0
else:
slice_time_ref = float(slice_time_ref)
return np.arange(n_timepoints) * TR + start_time + (slice_time_ref * TR)
def _parse_bids_entities(path: Path) -> Dict[str, str]:
stem = path.name
for suffix in (".nii.gz", ".func.gii", ".tsv", ".json"):
if stem.endswith(suffix):
stem = stem[: -len(suffix)]
break
entities = {}
for match in ENTITY_RE.finditer(stem):
entities[match.group(2)] = match.group(3)
return entities
def _find_matching_bids_file(
directory: Path,
reference_file: Path,
suffix: str,
*,
required_entities: Optional[Tuple[str, ...]] = None,
) -> Optional[Path]:
if required_entities is None:
required_entities = ("sub", "ses", "task", "acq", "ce", "dir", "rec", "run")
reference_entities = _parse_bids_entities(reference_file)
reference_entities = {
key: reference_entities[key]
for key in required_entities
if key in reference_entities
}
matches = []
for candidate in sorted(directory.glob(f"*{suffix}")):
candidate_entities = _parse_bids_entities(candidate)
if all(
candidate_entities.get(key) == value
for key, value in reference_entities.items()
):
matches.append(candidate)
if len(matches) == 0:
return None
return matches[0]
def _make_json_serializable(value):
if isinstance(value, dict):
return {
str(key): _make_json_serializable(val)
for key, val in value.items()
}
if isinstance(value, (list, tuple)):
return [_make_json_serializable(val) for val in value]
if isinstance(value, Path):
return str(value)
if isinstance(value, np.ndarray):
return value.tolist()
if isinstance(value, np.generic):
return value.item()
return value
def _build_fc_preproc_config(clean_config: Dict) -> Dict:
return {
key: _make_json_serializable(clean_config[key])
for key in FC_PREPROC_CONFIG_KEYS
if key in clean_config
}
def _fc_preproc_config_hash(preproc_config: Dict) -> str:
payload = json.dumps(preproc_config, sort_keys=True, separators=(",", ":"))
return hashlib.sha1(payload.encode("utf-8")).hexdigest()[:12]
def _get_fc_preproc_root() -> Path:
return get_bids_deriv_folder() / FC_PREPROC_DERIV_NAME
def _with_global_run_label(filename: str, run_label: str) -> str:
if RUN_RE.search(filename) is not None:
return RUN_RE.sub(f"_run-{run_label}_", filename, count=1)
if "_desc-" in filename:
return filename.replace("_desc-", f"_run-{run_label}_desc-", 1)
if "_bold" in filename:
return filename.replace("_bold", f"_run-{run_label}_bold", 1)
return filename
def _strip_session_entity(filename: str) -> str:
return SES_RE.sub("_", filename, count=1)
def _get_fc_preproc_record_paths(record: Dict, preproc_config: Dict) -> Dict[str, Path]:
config_hash = _fc_preproc_config_hash(preproc_config)
if "func_file" in record:
source_file = record["func_file"]
else:
source_file = record["func_files"]["L"]
source_entities = _parse_bids_entities(source_file)
subject_label = source_entities.get("sub")
if subject_label is None:
raise ValueError(
f"Could not infer subject label from cache source file: {source_file}"
)
output_dir = _get_fc_preproc_root() / f"sub-{subject_label}" / "func"
output_dir.mkdir(parents=True, exist_ok=True)
if "func_file" in record:
output_name = _strip_session_entity(source_file.name)
output_name = _with_global_run_label(output_name, record["run_label"])
output_name = output_name.replace(
"_desc-preproc_bold.nii.gz",
f"_desc-fcpreproc-{config_hash}_bold.nii.gz",
)
img_path = output_dir / output_name
meta_path = output_dir / output_name.replace(".nii.gz", ".json")
return {"img": img_path, "meta": meta_path}
paths = {}
for hemi, source_path in record["func_files"].items():
output_name = _strip_session_entity(source_path.name)
output_name = _with_global_run_label(output_name, record["run_label"])
output_name = output_name.replace(
"_desc-preproc_bold.func.gii",
f"_desc-fcpreproc-{config_hash}_bold.func.gii",
)
paths[hemi] = output_dir / output_name
meta_path = output_dir / paths["L"].name.replace(".func.gii", ".json")
paths["meta"] = meta_path
return paths
def _get_img_n_timepoints(img) -> int:
if is_surface_image(img):
return int(np.asarray(img.data.parts["left"]).shape[-1])
data = img.get_fdata()
if data.ndim < 4:
return 1
return int(data.shape[-1])
def _subset_cleaned_img(img, selected_frames: np.ndarray):
if is_surface_image(img):
return SurfaceImage(
mesh=img.mesh,
data={
part_name: np.asarray(img.data.parts[part_name])[:, selected_frames]
for part_name in img.data.parts
},
)
data = img.get_fdata()
if data.ndim != 4:
raise ValueError("Cleaned functional image must be 4D.")
return image.new_img_like(
img,
data[..., selected_frames],
copy_header=True,
)
def _load_cached_preprocessed_run(record: Dict, preproc_config: Dict):
cache_paths = _get_fc_preproc_record_paths(record, preproc_config)
if "img" in cache_paths:
if not cache_paths["img"].exists():
return None
return {
"cleaned_img": load_img(cache_paths["img"]),
"cache_paths": cache_paths,
**record,
}
if not all(cache_paths[hemi].exists() for hemi in SURFACE_HEMIS):
return None
return {
"cleaned_img": load_surface_image(
{hemi: cache_paths[hemi] for hemi in SURFACE_HEMIS},
record["mesh_paths"],
),
"cache_paths": cache_paths,
**record,
}
def _save_cached_preprocessed_run(cleaned_img, record: Dict, preproc_config: Dict):
cache_paths = _get_fc_preproc_record_paths(record, preproc_config)
metadata = {
"source_files": _make_json_serializable(
record.get("func_file", record.get("func_files"))
),
"confounds_file": _make_json_serializable(record["confounds_file"]),
"events_file": _make_json_serializable(record["events_file"]),
"run_label": record["run_label"],
"bids_run_label": record.get("bids_run_label"),
"session_label": record["session_label"],
"TR": record["TR"],
"StartTime": record["StartTime"],
"preproc_config": preproc_config,
"config_hash": _fc_preproc_config_hash(preproc_config),
}
if "img" in cache_paths:
cleaned_img.to_filename(cache_paths["img"])
else:
save_surface_image(
cleaned_img,
{hemi: cache_paths[hemi] for hemi in SURFACE_HEMIS},
)
with open(cache_paths["meta"], "w") as f:
json.dump(metadata, f, indent=2, sort_keys=True)
return {
"cleaned_img": cleaned_img,
"cache_paths": cache_paths,
**record,
}
[docs]
def preprocess_bold_for_fc(
subjects: Union[str, List[str]],
task: str,
*,
run_filter: Optional[
Union[str, int, List[Union[str, int]], Tuple[Union[str, int], ...]]
] = None,
space: Optional[str] = None,
clean_surf: bool = False,
mask_suffix: str = FC_CLEAN_CONFIG["mask_suffix"],
mask_fwhm: Optional[float] = FC_CLEAN_CONFIG["mask_fwhm"],
target_affine: Optional[List[List[float]]] = None,
confounds_regex: str = FC_CLEAN_CONFIG["confounds_regex"],
fd_threshold: Optional[float] = FC_CLEAN_CONFIG["fd_threshold"],
volume_fwhm: Optional[float] = FC_CLEAN_CONFIG["volume_fwhm"],
surface_fwhm: Optional[float] = FC_CLEAN_CONFIG["surface_fwhm"],
standardize: Union[bool, str, None] = FC_CLEAN_CONFIG["standardize"],
detrend: bool = FC_CLEAN_CONFIG["detrend"],
regress_out_task: bool = FC_CLEAN_CONFIG["regress_out_task"],
regress_task_conditions: Optional[
Union[str, List[str], Tuple[str, ...]]
] = None,
low_pass: Optional[float] = FC_CLEAN_CONFIG["low_pass"],
high_pass: Optional[float] = FC_CLEAN_CONFIG["high_pass"],
min_T: int = FC_CLEAN_CONFIG["min_T"],
overwrite: bool = False,
) -> pd.DataFrame:
if isinstance(subjects, str):
subjects = [subjects]
clean_config = {
"clean_surf": clean_surf,
"mask_suffix": mask_suffix,
"mask_fwhm": mask_fwhm,
"target_affine": target_affine,
"confounds_regex": confounds_regex,
"fd_threshold": fd_threshold,
"volume_fwhm": volume_fwhm,
"surface_fwhm": surface_fwhm,
"standardize": _normalize_standardize_arg(standardize),
"detrend": detrend,
"regress_out_task": regress_out_task,
"regress_task_conditions": _normalize_task_conditions(
regress_task_conditions
),
"task_conditions": None,
"low_pass": low_pass,
"high_pass": high_pass,
"min_T": min_T,
}
preproc_config = _build_fc_preproc_config(clean_config)
manifest_rows = []
for subject in subjects:
prepared_runs = load_preprocessed_bold_for_fc(
subject,
task,
run_filter=run_filter,
space=space,
clean_surf=clean_surf,
mask_suffix=mask_suffix,
mask_fwhm=mask_fwhm,
target_affine=target_affine,
confounds_regex=confounds_regex,
fd_threshold=fd_threshold,
volume_fwhm=volume_fwhm,
surface_fwhm=surface_fwhm,
standardize=standardize,
detrend=detrend,
regress_out_task=regress_out_task,
regress_task_conditions=regress_task_conditions,
low_pass=low_pass,
high_pass=high_pass,
min_T=min_T,
create_if_missing=True,
overwrite=overwrite,
)
for prepared in prepared_runs:
row = {
"subject": subject,
"task": task,
"run_label": prepared["run_label"],
"bids_run_label": prepared.get("bids_run_label"),
"config_hash": _fc_preproc_config_hash(preproc_config),
}
row.update(
{
f"cache_{key}": str(path)
for key, path in prepared["cache_paths"].items()
}
)
manifest_rows.append(row)
return pd.DataFrame(manifest_rows)
[docs]
def load_preprocessed_bold_for_fc(
subject: str,
task: str,
*,
run_filter: Optional[
Union[str, int, List[Union[str, int]], Tuple[Union[str, int], ...]]
] = None,
space: Optional[str] = None,
clean_surf: bool = False,
mask_suffix: str = FC_CLEAN_CONFIG["mask_suffix"],
mask_fwhm: Optional[float] = FC_CLEAN_CONFIG["mask_fwhm"],
target_affine: Optional[List[List[float]]] = None,
confounds_regex: str = FC_CLEAN_CONFIG["confounds_regex"],
fd_threshold: Optional[float] = FC_CLEAN_CONFIG["fd_threshold"],
volume_fwhm: Optional[float] = FC_CLEAN_CONFIG["volume_fwhm"],
surface_fwhm: Optional[float] = FC_CLEAN_CONFIG["surface_fwhm"],
standardize: Union[bool, str, None] = FC_CLEAN_CONFIG["standardize"],
detrend: bool = FC_CLEAN_CONFIG["detrend"],
regress_out_task: bool = FC_CLEAN_CONFIG["regress_out_task"],
regress_task_conditions: Optional[
Union[str, List[str], Tuple[str, ...]]
] = None,
low_pass: Optional[float] = FC_CLEAN_CONFIG["low_pass"],
high_pass: Optional[float] = FC_CLEAN_CONFIG["high_pass"],
min_T: int = FC_CLEAN_CONFIG["min_T"],
create_if_missing: bool = False,
overwrite: bool = False,
) -> List[Dict]:
clean_config = {
"clean_surf": clean_surf,
"mask_suffix": mask_suffix,
"mask_fwhm": mask_fwhm,
"target_affine": target_affine,
"confounds_regex": confounds_regex,
"fd_threshold": fd_threshold,
"volume_fwhm": volume_fwhm,
"surface_fwhm": surface_fwhm,
"standardize": _normalize_standardize_arg(standardize),
"detrend": detrend,
"regress_out_task": regress_out_task,
"regress_task_conditions": _normalize_task_conditions(
regress_task_conditions
),
"task_conditions": None,
"low_pass": low_pass,
"high_pass": high_pass,
"min_T": min_T,
}
preproc_config = _build_fc_preproc_config(clean_config)
if clean_surf:
run_records = FunctionalConnectivityEstimator._find_preprocessed_surface_runs(
subject, task, run_filter, space
)
else:
run_records = FunctionalConnectivityEstimator._find_preprocessed_runs(
subject, task, run_filter, space, mask_suffix
)
prepared_runs = []
for record in run_records:
prepared = None
if not overwrite:
prepared = _load_cached_preprocessed_run(record, preproc_config)
if prepared is None and create_if_missing:
if clean_surf:
cleaned_img = FunctionalConnectivityEstimator._clean_surface_run(
record, clean_config
)
else:
cleaned_img = FunctionalConnectivityEstimator._clean_run(
record, clean_config
)
if cleaned_img is None:
continue
prepared = _save_cached_preprocessed_run(
cleaned_img, record, preproc_config
)
if prepared is not None:
prepared_runs.append(prepared)
return prepared_runs
[docs]
class FunctionalConnectivityEstimator(AnalysisSaver):
"""
Estimate within-subject functional connectivity from cleaned
preprocessed BOLD runs.
"""
def __init__(
self,
subjects: List[str],
froi1: Union[FROIConfig, str, ParcelsConfig],
froi2: Union[FROIConfig, str, ParcelsConfig],
):
self.subjects = subjects
self.froi1 = froi1
self.froi2 = froi2
self._type = "fconn"
self._data_summary = None
self._data_detail = None
[docs]
def run(
self,
task: Optional[str] = None,
froi1_run_label: Optional[str] = None,
froi2_run_label: Optional[str] = None,
run_filter: Optional[
Union[str, int, List[Union[str, int]], Tuple[Union[str, int], ...]]
] = None,
space: Optional[str] = None,
clean_surf: bool = False,
mask_suffix: str = FC_CLEAN_CONFIG["mask_suffix"],
mask_fwhm: Optional[float] = FC_CLEAN_CONFIG["mask_fwhm"],
target_affine: Optional[List[List[float]]] = None,
confounds_regex: str = FC_CLEAN_CONFIG["confounds_regex"],
fd_threshold: Optional[float] = 0.5,
volume_fwhm: Optional[float] = 4,
surface_fwhm: Optional[float] = 4,
standardize: Union[bool, str, None] = True,
detrend: bool = True,
regress_out_task: bool = True,
task_conditions: Optional[Union[str, List[str], Tuple[str, ...]]] = None,
regress_task_conditions: Optional[
Union[str, List[str], Tuple[str, ...]]
] = None,
concat_conditions_across_runs: bool = False,
low_pass: Optional[float] = 0.1,
high_pass: Optional[float] = 0.01,
min_T: int = 50,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Run the functional connectivity analysis for all subjects.
"""
self.froi1_run_label = froi1_run_label
self.froi2_run_label = froi2_run_label
task = self._resolve_task(task, self.froi1, self.froi2)
clean_config = {
"clean_surf": clean_surf,
"mask_suffix": mask_suffix,
"mask_fwhm": mask_fwhm,
"target_affine": target_affine,
"confounds_regex": confounds_regex,
"fd_threshold": fd_threshold,
"volume_fwhm": volume_fwhm,
"surface_fwhm": surface_fwhm,
"standardize": _normalize_standardize_arg(standardize),
"detrend": detrend,
"regress_out_task": regress_out_task,
"task_conditions": _normalize_task_conditions(task_conditions),
"regress_task_conditions": _normalize_task_conditions(
regress_task_conditions
),
"concat_conditions_across_runs": concat_conditions_across_runs,
"low_pass": low_pass,
"high_pass": high_pass,
"min_T": min_T,
}
data_summary = []
data_detail = []
for subject in self.subjects:
froi1_labels, froi1_name, froi1_has_explicit_parcels, froi1_img = (
self._resolve_mask_data(self.froi1, subject, froi1_run_label)
)
froi2_labels, froi2_name, froi2_has_explicit_parcels, froi2_img = (
self._resolve_mask_data(self.froi2, subject, froi2_run_label)
)
subject_clean_config = dict(clean_config)
subject_clean_config["clean_surf"] = (
subject_clean_config["clean_surf"]
or is_surface_image(froi1_img)
or is_surface_image(froi2_img)
)
if subject_clean_config["clean_surf"] and not (
is_surface_image(froi1_img) and is_surface_image(froi2_img)
):
raise ValueError(
"Surface functional connectivity requires both ROI inputs to "
"be surface-based."
)
(
cleaned_imgs,
cleaned_run_labels,
cleaned_session_labels,
) = self._get_cleaned_imgs_by_run(
subject, task, run_filter, space, subject_clean_config
)
df_summary, df_detail = self._run(cleaned_imgs, froi1_img, froi2_img)
df_detail["bold_run"] = df_detail["run"].apply(
lambda idx: cleaned_run_labels[idx]
)
df_detail["bold_session"] = df_detail["run"].apply(
lambda idx: cleaned_session_labels[idx]
)
df_detail = df_detail.drop(columns=["run"])
if froi1_labels is not None:
df_summary["froi1"] = df_summary["froi1"].apply(
lambda x: froi1_labels[x]
)
df_detail["froi1"] = df_detail["froi1"].apply(
lambda x: froi1_labels[x]
)
elif not froi1_has_explicit_parcels:
df_summary = df_summary.drop(columns=["froi1"])
df_detail = df_detail.drop(columns=["froi1"])
if froi2_labels is not None:
df_summary["froi2"] = df_summary["froi2"].apply(
lambda x: froi2_labels[x]
)
df_detail["froi2"] = df_detail["froi2"].apply(
lambda x: froi2_labels[x]
)
elif not froi2_has_explicit_parcels:
df_summary = df_summary.drop(columns=["froi2"])
df_detail = df_detail.drop(columns=["froi2"])
if froi1_name == "parcel":
df_summary = df_summary.rename(columns={"froi1": "parcel1"})
df_detail = df_detail.rename(columns={"froi1": "parcel1"})
if froi2_name == "parcel":
df_summary = df_summary.rename(columns={"froi2": "parcel2"})
df_detail = df_detail.rename(columns={"froi2": "parcel2"})
df_summary["subject"] = subject
df_detail["subject"] = subject
data_summary.append(df_summary)
data_detail.append(df_detail)
self._data_summary = pd.concat(data_summary, ignore_index=True)
self._data_detail = pd.concat(data_detail, ignore_index=True)
new_info = pd.DataFrame(
{
"task": [task],
"run_filter": [run_filter],
"space": [space],
"froi1": [self.froi1],
"froi2": [self.froi2],
"customized_froi1_run": [froi1_run_label],
"customized_froi2_run": [froi2_run_label],
"clean_config": [clean_config],
}
)
self._save(new_info)
return self._data_summary, self._data_detail
@staticmethod
def _resolve_task(
task: Optional[str],
froi1: Union[FROIConfig, str, ParcelsConfig],
froi2: Union[FROIConfig, str, ParcelsConfig],
) -> str:
if task is not None:
return task
tasks = []
if isinstance(froi1, FROIConfig):
tasks.append(froi1.task)
if isinstance(froi2, FROIConfig):
tasks.append(froi2.task)
tasks = sorted(set(tasks))
if len(tasks) == 1:
return tasks[0]
raise ValueError(
"Task could not be inferred. Please specify the task whose cleaned "
"preprocessed data should be used."
)
@staticmethod
def _resolve_mask_data(
froi: Union[FROIConfig, str, ParcelsConfig],
subject: str,
run_label: Optional[str],
):
is_parcels = not isinstance(froi, FROIConfig)
parcels_config = froi.parcels if isinstance(froi, FROIConfig) else froi
_, labels = get_parcels(parcels_config)
if is_parcels:
parcels_img, labels = get_parcels(froi)
if parcels_img is None:
raise ValueError("Parcels image not found.")
return labels, "parcel", True, parcels_img
if run_label is None:
run_label = "all"
froi_img = _get_froi_data(subject, froi, run_label, return_nifti=True)
if froi_img is None:
raise ValueError(
f"Data not found for subject {subject}, fROI {froi}, run {run_label}."
)
return labels, "froi", not is_no_parcels(froi.parcels), froi_img
@staticmethod
def _get_cleaned_imgs_by_run(
subject: str,
task: str,
run_filter,
space: Optional[str],
config: Dict,
) -> Tuple[List, List[str], List[Optional[str]]]:
prepared_runs = load_preprocessed_bold_for_fc(
subject,
task,
run_filter=run_filter,
space=space,
clean_surf=config["clean_surf"],
mask_suffix=config["mask_suffix"],
mask_fwhm=config["mask_fwhm"],
target_affine=config["target_affine"],
confounds_regex=config["confounds_regex"],
fd_threshold=config["fd_threshold"],
volume_fwhm=config["volume_fwhm"],
surface_fwhm=config["surface_fwhm"],
standardize=config["standardize"],
detrend=config["detrend"],
regress_out_task=config["regress_out_task"],
regress_task_conditions=config["regress_task_conditions"],
low_pass=config["low_pass"],
high_pass=config["high_pass"],
min_T=config["min_T"],
create_if_missing=True,
)
prepared_runs = FunctionalConnectivityEstimator._filter_preprocessed_runs_by_min_t(
prepared_runs,
config["min_T"],
)
selected_runs = FunctionalConnectivityEstimator._select_preprocessed_runs(
prepared_runs,
config["task_conditions"],
config["concat_conditions_across_runs"],
)
if len(selected_runs) == 0:
if config["task_conditions"] is not None:
raise ValueError(
"No usable runs remained for subject "
f"{subject} and task {task} after selecting task_conditions "
f"{config['task_conditions']}."
)
raise ValueError(
f"No usable preprocessed runs found for subject {subject} and task {task}."
)
cleaned_imgs = [prepared["cleaned_img"] for prepared in selected_runs]
cleaned_run_labels = [prepared["run_label"] for prepared in selected_runs]
cleaned_session_labels = [
prepared["session_label"] for prepared in selected_runs
]
return cleaned_imgs, cleaned_run_labels, cleaned_session_labels
@staticmethod
def _select_preprocessed_runs(
prepared_runs: List[Dict],
task_conditions: Optional[List[str]],
concat_conditions_across_runs: bool,
) -> List[Dict]:
if task_conditions is None:
return prepared_runs
selected_runs = []
for prepared in prepared_runs:
cleaned_img = prepared["cleaned_img"]
n_timepoints = _get_img_n_timepoints(cleaned_img)
selected_frames = FunctionalConnectivityEstimator._select_task_condition_frames(
prepared,
n_timepoints,
task_conditions,
)
if selected_frames is None or not np.any(selected_frames):
continue
selected_img = _subset_cleaned_img(cleaned_img, selected_frames)
selected_runs.append(
{
**prepared,
"cleaned_img": selected_img,
}
)
if concat_conditions_across_runs:
if len(selected_runs) == 0:
return []
return [FunctionalConnectivityEstimator._concatenate_prepared_runs(selected_runs)]
return selected_runs
@staticmethod
def _filter_preprocessed_runs_by_min_t(
prepared_runs: List[Dict],
min_T: int,
) -> List[Dict]:
filtered_runs = []
for prepared in prepared_runs:
n_timepoints = _get_img_n_timepoints(prepared["cleaned_img"])
if n_timepoints < min_T:
source_name = prepared.get(
"func_file",
prepared.get("func_files", {}).get("L"),
)
if source_name is not None:
warnings.warn(
f"Skipping {Path(source_name).name}: insufficient timepoints."
)
continue
filtered_runs.append(prepared)
return filtered_runs
@staticmethod
def _concatenate_prepared_runs(prepared_runs: List[Dict]) -> Dict:
first = prepared_runs[0]
cleaned_imgs = [prepared["cleaned_img"] for prepared in prepared_runs]
if is_surface_image(cleaned_imgs[0]):
concatenated_img = SurfaceImage(
mesh=cleaned_imgs[0].mesh,
data={
part_name: np.concatenate(
[
np.asarray(img.data.parts[part_name])
for img in cleaned_imgs
],
axis=-1,
)
for part_name in cleaned_imgs[0].data.parts
},
)
else:
concatenated_img = image.concat_imgs(
cleaned_imgs, auto_resample=True
)
run_labels = [prepared["run_label"] for prepared in prepared_runs]
session_labels = [
prepared["session_label"]
for prepared in prepared_runs
if prepared["session_label"] is not None
]
session_label = None
if len(session_labels) == 1:
session_label = session_labels[0]
elif len(session_labels) > 1:
session_label = "+".join(session_labels)
return {
**first,
"cleaned_img": concatenated_img,
"run_label": "concatenated:" + "+".join(run_labels),
"session_label": session_label,
}
@staticmethod
def _resolve_run_filter_labels(
subject: str,
task: str,
available_run_labels: List[str],
run_filter,
) -> List[str]:
normalized_run_filter = _normalize_run_filter(run_filter)
if normalized_run_filter is None or normalized_run_filter == "all":
return available_run_labels
if isinstance(normalized_run_filter, list):
requested_labels = set(normalized_run_filter)
return [
run_label
for run_label in available_run_labels
if run_label in requested_labels
]
if normalized_run_filter == "odd":
return [
run_label
for run_label in available_run_labels
if run_label.isdigit() and int(run_label) % 2 == 1
]
if normalized_run_filter == "even":
return [
run_label
for run_label in available_run_labels
if run_label.isdigit() and int(run_label) % 2 == 0
]
if (
isinstance(normalized_run_filter, str)
and normalized_run_filter.startswith("orth")
):
excluded_run = normalized_run_filter[4:]
return [
run_label
for run_label in available_run_labels
if run_label != excluded_run
]
if (
isinstance(normalized_run_filter, str)
and not normalized_run_filter.isdigit()
):
try:
run_groups = _get_run_group_info(subject, task)
except RuntimeError:
run_groups = {}
if normalized_run_filter in run_groups:
requested_labels = {
_normalize_run_filter_token(run_i)
for run_i in run_groups[normalized_run_filter]
}
return [
run_label
for run_label in available_run_labels
if run_label in requested_labels
]
return [
run_label
for run_label in available_run_labels
if run_label == normalized_run_filter
]
@staticmethod
def _finalize_run_records(
subject: str,
task: str,
run_records: List[Dict],
run_filter,
missing_label: str,
) -> List[Dict]:
for run_i, record in enumerate(run_records, start=1):
record["run_label"] = f"{run_i:02d}"
available_run_labels = [record["run_label"] for record in run_records]
selected_run_labels = set(
FunctionalConnectivityEstimator._resolve_run_filter_labels(
subject,
task,
available_run_labels,
run_filter,
)
)
run_records = [
record
for record in run_records
if record["run_label"] in selected_run_labels
]
if len(run_records) == 0:
normalized_run_filter = _normalize_run_filter(run_filter)
if normalized_run_filter is None:
raise ValueError(
f"No {missing_label} runs found for subject {subject} and task {task}."
)
raise ValueError(
f"No {missing_label} runs found for subject {subject}, task {task}, "
f"and run_filter {normalized_run_filter}."
)
return run_records
@staticmethod
def _find_preprocessed_runs(
subject: str,
task: str,
run_filter,
space: Optional[str],
mask_suffix: str,
) -> List[Dict]:
preproc_root = get_bids_preprocessed_folder()
bids_root = None
try:
bids_root = get_bids_data_folder()
except RuntimeError:
pass
filters = [("task", task)]
if space is not None:
filters.append(("space", space))
func_files = [
Path(path)
for path in get_bids_files(
main_path=preproc_root,
modality_folder="func",
file_tag="bold",
file_type="nii*",
sub_label=subject,
filters=filters,
)
if path.endswith("desc-preproc_bold.nii.gz")
]
if len(func_files) == 0:
raise ValueError(
f"No preprocessed BOLD runs found for subject {subject} and task {task}."
)
spaces_found = sorted(
{
match.group(1)
for path in func_files
for match in [SPACE_RE.search(path.name)]
if match is not None
}
)
if space is None and len(spaces_found) > 1:
raise ValueError(
"Multiple spaces were found for the requested task. Please specify `space`."
)
run_records = []
for func_file in func_files:
file_space_match = SPACE_RE.search(func_file.name)
file_space = (
file_space_match.group(1) if file_space_match is not None else None
)
if space is not None and file_space != space:
continue
session_match = SES_RE.search(func_file.name)
file_session = (
session_match.group(1) if session_match is not None else None
)
run_match = RUN_RE.search(func_file.name)
bids_run_label = (
run_match.group(1) if run_match is not None else None
)
confounds_file = _find_matching_bids_file(
func_file.parent,
func_file,
"_desc-confounds_timeseries.tsv",
)
if confounds_file is None or not confounds_file.exists():
warnings.warn(
f"Confounds file not found for {func_file.name}, skipping."
)
continue
if bids_root is None:
events_file = None
else:
if file_session is None:
events_dir = bids_root / f"sub-{subject}" / "func"
else:
events_dir = bids_root / f"sub-{subject}" / f"ses-{file_session}" / "func"
events_file = _find_matching_bids_file(
events_dir,
func_file,
"_events.tsv",
)
anat_dir = FunctionalConnectivityEstimator._find_anat_dir(
preproc_root, subject, file_session
)
mask_file, used_functional_mask = (
FunctionalConnectivityEstimator._find_mask_file(
anat_dir,
func_file,
subject,
file_session,
file_space,
mask_suffix,
)
)
if mask_file is None:
warnings.warn(
f"No matching mask found for {func_file.name}, skipping."
)
continue
if used_functional_mask:
warnings.warn(
f"Anatomical mask not found for {func_file.name}; using "
f"functional mask {mask_file.name} instead."
)
sidecar_path = FunctionalConnectivityEstimator._find_bold_sidecar(
func_file
)
if not sidecar_path.exists():
warnings.warn(
f"JSON sidecar not found for {func_file.name}, skipping."
)
continue
with open(sidecar_path, "r") as f:
sidecar = json.load(f)
TR = sidecar.get("RepetitionTime")
if TR is None:
warnings.warn(
f"RepetitionTime missing for {func_file.name}, skipping."
)
continue
run_records.append(
{
"func_file": func_file,
"confounds_file": confounds_file,
"events_file": events_file,
"mask_file": mask_file,
"TR": TR,
"StartTime": sidecar.get("StartTime", 0),
"sidecar": sidecar,
"bids_run_label": bids_run_label,
"session_label": file_session,
}
)
return FunctionalConnectivityEstimator._finalize_run_records(
subject,
task,
run_records,
run_filter,
"preprocessed BOLD",
)
@staticmethod
def _find_preprocessed_surface_runs(
subject: str,
task: str,
run_filter,
space: Optional[str],
) -> List[Dict]:
preproc_root = get_bids_preprocessed_folder()
bids_root = None
try:
bids_root = get_bids_data_folder()
except RuntimeError:
pass
if space is None:
raise ValueError(
"Surface functional connectivity requires an explicit `space`."
)
filters = [("task", task), ("space", space), ("hemi", "L")]
func_files = [
Path(path)
for path in get_bids_files(
main_path=preproc_root,
modality_folder="func",
file_tag="bold",
file_type="func.gii",
sub_label=subject,
filters=filters,
)
if path.endswith("desc-preproc_bold.func.gii")
]
if len(func_files) == 0:
raise ValueError(
f"No surface preprocessed BOLD runs found for subject {subject} and task {task}."
)
spaces_found = sorted(
{
match.group(1)
for path in func_files
for match in [SPACE_RE.search(path.name)]
if match is not None
}
)
run_records = []
for left_file in func_files:
file_space_match = SPACE_RE.search(left_file.name)
file_space = (
file_space_match.group(1)
if file_space_match is not None
else None
)
if file_space is None:
warnings.warn(
f"Space entity missing for {left_file.name}, skipping."
)
continue
if space is not None and file_space != space:
continue
right_file = Path(str(left_file).replace("_hemi-L_", "_hemi-R_"))
if not right_file.exists():
warnings.warn(
f"Right hemisphere file not found for {left_file.name}, skipping."
)
continue
session_match = SES_RE.search(left_file.name)
file_session = (
session_match.group(1)
if session_match is not None
else None
)
run_match = RUN_RE.search(left_file.name)
bids_run_label = (
run_match.group(1) if run_match is not None else None
)
confounds_file = _find_matching_bids_file(
left_file.parent,
left_file,
"_desc-confounds_timeseries.tsv",
)
if confounds_file is None or not confounds_file.exists():
warnings.warn(
f"Confounds file not found for {left_file.name}, skipping."
)
continue
if bids_root is None:
events_file = None
else:
if file_session is None:
events_dir = bids_root / f"sub-{subject}" / "func"
else:
events_dir = (
bids_root
/ f"sub-{subject}"
/ f"ses-{file_session}"
/ "func"
)
events_file = _find_matching_bids_file(
events_dir,
left_file,
"_events.tsv",
)
mesh_paths = _find_surface_mesh_paths(
preproc_root, subject, file_space
)
if mesh_paths is None:
warnings.warn(
f"No matching surface meshes found for {left_file.name}, skipping."
)
continue
sidecar_path = Path(str(left_file).replace(".func.gii", ".json"))
if not sidecar_path.exists():
warnings.warn(
f"JSON sidecar not found for {left_file.name}, skipping."
)
continue
with open(sidecar_path, "r") as f:
sidecar = json.load(f)
TR = sidecar.get("RepetitionTime")
if TR is None:
warnings.warn(
f"RepetitionTime missing for {left_file.name}, skipping."
)
continue
run_records.append(
{
"func_files": {"L": left_file, "R": right_file},
"mesh_paths": mesh_paths,
"confounds_file": confounds_file,
"events_file": events_file,
"TR": TR,
"StartTime": sidecar.get("StartTime", 0),
"sidecar": sidecar,
"bids_run_label": bids_run_label,
"session_label": file_session,
}
)
return FunctionalConnectivityEstimator._finalize_run_records(
subject,
task,
run_records,
run_filter,
"surface preprocessed BOLD",
)
def _find_anat_dir(
preproc_root: Path, subject: str, session: Optional[str]
) -> Path:
if session is not None:
anat_dir = preproc_root / f"sub-{subject}" / f"ses-{session}" / "anat"
if anat_dir.exists():
return anat_dir
anat_dir = preproc_root / f"sub-{subject}" / "anat"
if anat_dir.exists():
return anat_dir
raise FileNotFoundError(
f"No anatomical directory found for subject {subject}."
)
@staticmethod
def _find_mask_file(
anat_dir: Path,
func_file: Path,
subject: str,
session: Optional[str],
space: Optional[str],
mask_suffix: str,
) -> Tuple[Optional[Path], bool]:
ses_str = f"_ses-{session}" if session is not None else ""
patterns = []
if space in {None, "T1w"}:
patterns.append(f"sub-{subject}{ses_str}{mask_suffix}")
if space is not None and space != "T1w":
patterns.append(f"sub-{subject}{ses_str}_space-{space}{mask_suffix}")
patterns.append(f"sub-{subject}{ses_str}_space-{space}_res-*{mask_suffix}")
for pattern in patterns:
matches = sorted(anat_dir.glob(pattern))
if len(matches) != 0:
return matches[0], False
func_mask_name = func_file.name.replace(
"_desc-preproc_bold.nii.gz", mask_suffix
)
func_mask_file = func_file.parent / func_mask_name
if func_mask_file.exists():
return func_mask_file, True
return None, False
@staticmethod
def _find_bold_sidecar(func_file: Path) -> Path:
exact_match = func_file.parent / func_file.name.replace(".nii.gz", ".json")
if exact_match.exists():
return exact_match
candidate = _find_matching_bids_file(
func_file.parent,
func_file,
"_bold.json",
required_entities=(
"sub",
"ses",
"task",
"acq",
"ce",
"dir",
"rec",
"run",
"space",
),
)
if candidate is not None:
return candidate
return exact_match
@staticmethod
def _clean_run(record: Dict, config: Dict):
confounds_raw = pd.read_csv(record["confounds_file"], sep="\t")
confounds = confounds_raw.filter(regex=config["confounds_regex"])
extra_confounds = []
if (
"framewise_displacement" in confounds_raw.columns
and config["fd_threshold"] is not None
):
fd_values = confounds_raw["framewise_displacement"].fillna(0)
for idx, is_outlier in enumerate(fd_values > config["fd_threshold"]):
if is_outlier:
outlier = np.zeros(len(confounds_raw))
outlier[idx] = 1
extra_confounds.append(
pd.DataFrame({f"fd_outlier_{idx:04d}": outlier})
)
if (
config["regress_out_task"]
and record["events_file"] is not None
and Path(record["events_file"]).exists()
):
extra_confounds.extend(
FunctionalConnectivityEstimator._build_task_regressors(
record["events_file"],
record["sidecar"],
record["TR"],
record["StartTime"],
len(confounds_raw),
task_conditions=config["regress_task_conditions"],
)
)
all_confounds = [confounds] + extra_confounds
confounds = pd.concat(all_confounds, axis=1).fillna(0)
mask_img = load_img(record["mask_file"])
mask_img = image.new_img_like(
mask_img, image.get_data(mask_img).astype(np.float32)
)
if config["mask_fwhm"] is not None:
mask_img = image.smooth_img(mask_img, fwhm=config["mask_fwhm"])
if config["target_affine"] is not None:
mask_img = image.resample_img(
mask_img,
target_affine=np.asarray(config["target_affine"]),
interpolation="linear",
copy_header=True,
force_resample=True,
)
mask_img = image.math_img("img > 0", img=mask_img)
mask_img = image.crop_img(mask_img, copy_header=True)
func_img = load_img(record["func_file"])
if len(func_img.shape) <= 3 or func_img.shape[3] < config["min_T"]:
warnings.warn(
f"Skipping {record['func_file'].name}: insufficient timepoints."
)
return None
if config["target_affine"] is not None:
func_img = image.resample_to_img(
func_img, mask_img, copy_header=True, force_resample=True
)
masker = maskers.NiftiMasker(
mask_img=mask_img,
standardize=config["standardize"],
detrend=config["detrend"],
t_r=record["TR"],
smoothing_fwhm=config["volume_fwhm"],
low_pass=config["low_pass"],
high_pass=config["high_pass"],
)
cleaned_data = _masker_fit_transform_with_explicit_mask(
masker,
func_img,
confounds,
)
return masker.inverse_transform(cleaned_data)
@staticmethod
def _clean_surface_run(record: Dict, config: Dict):
confounds_raw = pd.read_csv(record["confounds_file"], sep="\t")
confounds = confounds_raw.filter(regex=config["confounds_regex"])
extra_confounds = []
if (
"framewise_displacement" in confounds_raw.columns
and config["fd_threshold"] is not None
):
fd_values = confounds_raw["framewise_displacement"].fillna(0)
for idx, is_outlier in enumerate(fd_values > config["fd_threshold"]):
if is_outlier:
outlier = np.zeros(len(confounds_raw))
outlier[idx] = 1
extra_confounds.append(
pd.DataFrame({f"fd_outlier_{idx:04d}": outlier})
)
if (
config["regress_out_task"]
and record["events_file"] is not None
and Path(record["events_file"]).exists()
):
extra_confounds.extend(
FunctionalConnectivityEstimator._build_task_regressors(
record["events_file"],
record["sidecar"],
record["TR"],
record["StartTime"],
len(confounds_raw),
task_conditions=config["regress_task_conditions"],
)
)
all_confounds = [confounds] + extra_confounds
confounds = pd.concat(all_confounds, axis=1).fillna(0)
confounds_arg = confounds if confounds.shape[1] != 0 else None
func_img = load_surface_image(record["func_files"], record["mesh_paths"])
cleaned_parts = {}
n_timepoints = None
for hemi in SURFACE_HEMIS:
part_name = SURFACE_PARTS[hemi]
hemi_data = np.asarray(
func_img.data.parts[part_name], dtype=np.float32
)
if hemi_data.ndim == 1:
hemi_data = hemi_data[:, None]
if n_timepoints is None:
n_timepoints = hemi_data.shape[-1]
elif hemi_data.shape[-1] != n_timepoints:
raise ValueError(
"Surface hemispheres must share the same number of timepoints."
)
if n_timepoints < config["min_T"]:
warnings.warn(
"Skipping surface run: insufficient timepoints."
)
return None
hemi_data = _smooth_surface_array(
hemi_data,
func_img.mesh.parts[part_name],
config["surface_fwhm"],
)
cleaned = signal.clean(
hemi_data.T,
confounds=confounds_arg,
detrend=config["detrend"],
standardize=config["standardize"],
low_pass=config["low_pass"],
high_pass=config["high_pass"],
t_r=record["TR"],
)
cleaned_parts[part_name] = cleaned.T.astype(np.float32)
return SurfaceImage(
mesh=func_img.mesh,
data=cleaned_parts,
)
@staticmethod
def _build_task_regressors(
events_file: Path,
sidecar: Dict,
TR: float,
start_time: float,
n_timepoints: int,
task_conditions: Optional[List[str]] = None,
) -> List[pd.DataFrame]:
events = pd.read_csv(events_file, sep="\t")
if "trial_type" not in events.columns:
events["trial_type"] = "dummy"
if task_conditions is not None:
events = events[events["trial_type"].isin(task_conditions)].copy()
if events.shape[0] == 0:
return []
events["trial_type"] = "selected_task"
events = events[["trial_type", "onset", "duration"]]
dummies = pd.get_dummies(
events["trial_type"], prefix="trial_type", prefix_sep="."
)
events = pd.concat([dummies, events[["onset", "duration"]]], axis=1)
trial_cols = [col for col in events.columns if col.startswith("trial_type")]
frame_times = _frame_times_from_sidecar(
sidecar, TR, start_time, n_timepoints
)
regressors = []
for trial_col in trial_cols:
regressor, _ = glm.first_level.compute_regressor(
(
events["onset"],
events["duration"],
events[trial_col],
),
"glover + derivative",
frame_times,
)
if regressor.shape[1] == 1:
column_names = [trial_col]
else:
column_names = [trial_col, f"{trial_col}_derivative"]
regressors.append(pd.DataFrame(regressor, columns=column_names))
return regressors
@staticmethod
def _select_task_condition_frames(
record: Dict,
n_timepoints: int,
task_conditions: Optional[List[str]],
) -> Optional[np.ndarray]:
if task_conditions is None:
return None
events_file = record["events_file"]
if events_file is None or not Path(events_file).exists():
warnings.warn(
"Skipping run because task condition selection was requested "
"but no events file is available."
)
return None
events = pd.read_csv(events_file, sep="\t")
if "trial_type" not in events.columns:
warnings.warn(
f"Skipping {Path(events_file).name}: events file does not "
"contain a trial_type column."
)
return None
events = events[events["trial_type"].isin(task_conditions)].copy()
if events.shape[0] == 0:
warnings.warn(
f"Skipping {Path(events_file).name}: none of the requested "
"task conditions were found."
)
return None
frame_times = _frame_times_from_sidecar(
record["sidecar"],
record["TR"],
record["StartTime"],
n_timepoints,
)
frame_starts = frame_times - (record["TR"] / 2.0)
frame_ends = frame_times + (record["TR"] / 2.0)
selected = np.zeros(n_timepoints, dtype=bool)
for _, event in events.iterrows():
onset = float(event["onset"])
duration = float(event["duration"])
event_end = onset + max(duration, 0.0)
selected |= (frame_starts <= event_end) & (frame_ends > onset)
if not np.any(selected):
warnings.warn(
f"Skipping {Path(events_file).name}: none of the requested "
"task conditions overlapped any sampled frames."
)
return None
return selected
@staticmethod
def _run(cleaned_imgs: List, froi1_img, froi2_img) -> Tuple[pd.DataFrame, pd.DataFrame]:
if len(cleaned_imgs) == 0:
raise ValueError("No cleaned runs available for connectivity analysis.")
detail_rows = []
resampled_masks = {}
for run_idx, cleaned_img in enumerate(cleaned_imgs):
if is_surface_image(cleaned_img):
if not (
is_surface_image(froi1_img) and is_surface_image(froi2_img)
):
raise ValueError(
"Surface connectivity requires surface ROI masks."
)
time_series = np.concatenate(
[
np.asarray(
cleaned_img.data.parts[SURFACE_PARTS[hemi]]
)
for hemi in SURFACE_HEMIS
],
axis=0,
).T
froi1_masks = flatten_image_data(froi1_img)
froi2_masks = flatten_image_data(froi2_img)
if (
time_series.shape[1] != froi1_masks.size
or time_series.shape[1] != froi2_masks.size
):
raise ValueError(
"Surface ROI masks must match the cleaned data vertex count."
)
else:
cleaned_data = cleaned_img.get_fdata()
if cleaned_data.ndim != 4:
raise ValueError("Cleaned functional image must be 4D.")
time_series = cleaned_data.reshape(
(-1, cleaned_data.shape[-1])
).T
cache_key = (
cleaned_img.shape[:3],
tuple(cleaned_img.affine.flatten()),
)
if cache_key not in resampled_masks:
froi1_resampled = image.resample_to_img(
froi1_img,
image.index_img(cleaned_img, 0),
interpolation="nearest",
copy_header=True,
force_resample=True,
).get_fdata().flatten()
froi2_resampled = image.resample_to_img(
froi2_img,
image.index_img(cleaned_img, 0),
interpolation="nearest",
copy_header=True,
force_resample=True,
).get_fdata().flatten()
resampled_masks[cache_key] = (
froi1_resampled,
froi2_resampled,
)
froi1_masks, froi2_masks = resampled_masks[cache_key]
froi1_labels = np.unique(froi1_masks)
froi1_labels = froi1_labels[
~np.isnan(froi1_labels) & (froi1_labels != 0)
]
froi2_labels = np.unique(froi2_masks)
froi2_labels = froi2_labels[
~np.isnan(froi2_labels) & (froi2_labels != 0)
]
froi1_roi_masks = np.stack(
[(froi1_masks == label) for label in froi1_labels], axis=0
)
froi2_roi_masks = np.stack(
[(froi2_masks == label) for label in froi2_labels], axis=0
)
ts1 = np.array(
[np.nanmean(time_series[:, mask], axis=1) for mask in froi1_roi_masks]
)
ts2 = np.array(
[np.nanmean(time_series[:, mask], axis=1) for mask in froi2_roi_masks]
)
for i, label1 in enumerate(froi1_labels):
for j, label2 in enumerate(froi2_labels):
valid = ~np.isnan(ts1[i]) & ~np.isnan(ts2[j])
if np.sum(valid) < 2:
fisher_z = np.nan
else:
corr = np.corrcoef(ts1[i, valid], ts2[j, valid])[0, 1]
corr = np.clip(
corr,
-1 + np.finfo(float).eps,
1 - np.finfo(float).eps,
)
fisher_z = np.arctanh(corr)
detail_rows.append(
{
"run": run_idx,
"froi1": label1,
"froi2": label2,
"fisher_z": fisher_z,
}
)
df_detail = pd.DataFrame(detail_rows)
df_summary = (
df_detail.groupby(["froi1", "froi2"])
.agg({"fisher_z": "mean"})
.reset_index()
)
return df_summary, df_detail