Source code for funROI.first_level.nilearn

import json
import os
import re
import warnings
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

import nibabel as nib
import numpy as np
import pandas as pd
from nilearn.glm import expression_to_contrast_vector
from nilearn.glm.first_level import (
    FirstLevelModel,
    first_level_from_bids,
    make_first_level_design_matrix,
)
from nilearn.image import load_img, new_img_like
from nilearn.interfaces.bids.query import get_bids_files
from nilearn.interfaces.fmriprep import load_confounds
from nilearn.surface import SurfaceImage
from scipy import sparse

from .._surface import (
    SURFACE_HEMIS,
    SURFACE_PARTS,
    is_surface_image,
    load_surface_image,
    load_surface_numeric_data,
    save_surface_image,
)
from ..contrast import (
    _get_contrast_folder,
    _get_contrast_path,
    _get_design_matrix_path,
    _get_residuals_path,
    _get_run_group_info_path,
    _get_surface_contrast_path,
    _get_surface_residuals_path,
)
from ..settings import (
    get_bids_data_folder,
    get_bids_preprocessed_folder,
    get_bids_preprocessed_folder_relative,
)
from .utils import _register_contrast


IMAGE_SUFFIXES = {
    "z_score": "z",
    "effect_size": "effect",
    "effect_variance": "variance",
    "stat": "t",
    "p_value": "p",
}


ENTITY_RE = re.compile(r"(^|_)([A-Za-z0-9]+)-([^_]+)")


def _compute_contrast_safely(model, contrast_vector):
    with warnings.catch_warnings():
        warnings.filterwarnings(
            "ignore",
            message=r"Contrast for run \d+ is null\.",
            category=UserWarning,
        )
        return model.compute_contrast(
            contrast_vector, stat_type="t", output_type="all"
        )


def _compute_contrast_volume(
    sub, task, run, model, contrast_name, contrast_vector
):
    contrast_imgs = _compute_contrast_safely(model, np.array(contrast_vector))
    for image_type, suffix in IMAGE_SUFFIXES.items():
        contrast_imgs[image_type].to_filename(
            _get_contrast_path(sub, task, run, contrast_name, suffix)
        )


def _save_image(img, volume_path: Path, surface_paths: Dict[str, Path]) -> None:
    if is_surface_image(img):
        save_surface_image(img, surface_paths)
        return
    img.to_filename(volume_path)


def _compute_contrast_surface(
    sub, task, run, model, contrast_name, contrast_vector
):
    contrast_imgs = _compute_contrast_safely(model, contrast_vector)
    for image_type, suffix in IMAGE_SUFFIXES.items():
        _save_image(
            contrast_imgs[image_type],
            _get_contrast_path(sub, task, run, contrast_name, suffix),
            {
                hemi: _get_surface_contrast_path(
                    sub, task, run, contrast_name, suffix, hemi
                )
                for hemi in SURFACE_HEMIS
            },
        )


def _validate_run_groups(
    run_groups: Optional[Dict[str, List[int]]], n_runs: int
) -> Dict[str, List[str]]:
    if run_groups is None:
        return {}

    reserved_labels = {"all", "odd", "even"}
    normalized_groups = {}
    for label, run_ids in run_groups.items():
        if label in reserved_labels or label.isdigit() or label.startswith("orth"):
            raise ValueError(
                f"Invalid custom run group name '{label}'. "
                "Custom run groups cannot reuse built-in run labels."
            )
        if len(run_ids) == 0:
            raise ValueError(
                f"Custom run group '{label}' must include at least one run."
            )
        if len(set(run_ids)) != len(run_ids):
            raise ValueError(
                f"Custom run group '{label}' contains duplicate run ids."
            )
        if min(run_ids) < 1 or max(run_ids) > n_runs:
            raise ValueError(
                f"Custom run group '{label}' includes an invalid run id. "
                f"Expected 1-indexed run ids between 1 and {n_runs}."
            )
        normalized_groups[label] = [f"{run_id:02d}" for run_id in run_ids]
    return normalized_groups


def _build_run_group_summary(
    run_labels: List[str],
    orthogs: Optional[List[str]],
    custom_run_groups: Dict[str, List[str]],
) -> pd.DataFrame:
    records = [
        {
            "run_label": run_label,
            "runs": [run_label],
            "n_runs": 1,
            "group_type": "single-run",
        }
        for run_label in run_labels
    ]
    records.append(
        {
            "run_label": "all",
            "runs": run_labels,
            "n_runs": len(run_labels),
            "group_type": "builtin",
        }
    )

    if len(run_labels) > 1 and orthogs is not None:
        if "odd-even" in orthogs:
            for group_label, rem in [("odd", 1), ("even", 0)]:
                runs = [
                    run_label
                    for run_label in run_labels
                    if int(run_label) % 2 == rem
                ]
                if len(runs) != 0:
                    records.append(
                        {
                            "run_label": group_label,
                            "runs": runs,
                            "n_runs": len(runs),
                            "group_type": "builtin",
                        }
                    )
        if "all-but-one" in orthogs:
            for run_label in run_labels:
                records.append(
                    {
                        "run_label": f"orth{run_label}",
                        "runs": [
                            other_run
                            for other_run in run_labels
                            if other_run != run_label
                        ],
                        "n_runs": len(run_labels) - 1,
                        "group_type": "builtin",
                    }
                )

    for group_label, runs in custom_run_groups.items():
        records.append(
            {
                "run_label": group_label,
                "runs": runs,
                "n_runs": len(runs),
                "group_type": "custom",
            }
        )
    return pd.DataFrame(records)


def _mesh_laplacian(mesh) -> sparse.csr_matrix:
    coordinates = np.asarray(mesh.coordinates, dtype=float)
    faces = np.asarray(mesh.faces, dtype=int)
    if faces.ndim != 2 or faces.shape[1] != 3:
        raise ValueError("Surface smoothing expects triangular faces.")

    edge_pairs = np.vstack(
        [
            faces[:, [0, 1]],
            faces[:, [1, 2]],
            faces[:, [2, 0]],
        ]
    )
    edge_pairs = np.sort(edge_pairs, axis=1)
    edge_pairs = np.unique(edge_pairs, axis=0)
    edge_lengths = np.linalg.norm(
        coordinates[edge_pairs[:, 0]] - coordinates[edge_pairs[:, 1]], axis=1
    )
    edge_lengths[edge_lengths == 0] = 1.0
    weights = 1.0 / (edge_lengths**2)

    adjacency = sparse.csr_matrix(
        (weights, (edge_pairs[:, 0], edge_pairs[:, 1])),
        shape=(len(coordinates), len(coordinates)),
    )
    adjacency = adjacency + adjacency.T
    degree = np.asarray(adjacency.sum(axis=1)).ravel()
    return sparse.diags(degree) - adjacency


def _smooth_surface_array(
    data: np.ndarray, mesh, fwhm: Optional[float]
) -> np.ndarray:
    data = np.asarray(data, dtype=np.float32)
    if fwhm is None or fwhm <= 0:
        return data.copy()

    sigma = float(fwhm) / np.sqrt(8.0 * np.log(2.0))
    if sigma == 0:
        return data.copy()

    laplacian = _mesh_laplacian(mesh)
    max_degree = float(laplacian.diagonal().max(initial=0.0))
    if max_degree == 0:
        return data.copy()

    total_time = (sigma**2) / 2.0
    n_steps = max(1, int(np.ceil(total_time * max_degree / 0.49)))
    step_size = total_time / n_steps

    smoothed = data.astype(np.float64, copy=True)
    for _ in range(n_steps):
        smoothed = smoothed - step_size * laplacian.dot(smoothed)
    return smoothed.astype(np.float32)


def _uses_surface_space(
    derivatives_root: Path, task: str, space: Optional[str]
) -> bool:
    if space is None:
        return False
    for path in derivatives_root.glob("sub-*/func/*.func.gii"):
        name = path.name
        if (
            f"task-{task}" in name
            and "hemi-L" in name
            and f"space-{space}" in name
            and name.endswith("_bold.func.gii")
        ):
            return True
    return False


def _parse_bids_entity_map(pathlike) -> Dict[str, str]:
    stem = Path(pathlike).name
    for suffix in (".nii.gz", ".func.gii", ".surf.gii", ".tsv", ".json"):
        if stem.endswith(suffix):
            stem = stem[: -len(suffix)]
            break

    entities = {}
    for match in ENTITY_RE.finditer(stem):
        entities[match.group(2)] = match.group(3)
    return entities


def _find_matching_bids_file(
    directory: Path,
    reference_file: Path,
    suffix: str,
    *,
    required_entities: Optional[Tuple[str, ...]] = None,
) -> Optional[Path]:
    if not directory.exists():
        return None

    if required_entities is None:
        required_entities = (
            "sub",
            "ses",
            "task",
            "acq",
            "ce",
            "dir",
            "rec",
            "run",
        )

    reference_entities = _parse_bids_entity_map(reference_file)
    reference_entities = {
        key: reference_entities[key]
        for key in required_entities
        if key in reference_entities
    }

    matches = []
    for candidate in sorted(directory.glob(f"*{suffix}")):
        candidate_entities = _parse_bids_entity_map(candidate)
        if all(
            candidate_entities.get(key) == value
            for key, value in reference_entities.items()
        ):
            matches.append(candidate)

    if len(matches) == 0:
        return None
    return matches[0]


def _parse_bids_entities(pathlike) -> Optional[Dict[str, str]]:
    filename = Path(pathlike).name
    pattern = (
        r"sub-(?P<subject>[^_]+)_task-(?P<task>[^_]+)"
        r"(?:_run-(?P<run>[^_]+))?"
        r"(?:_acq-(?P<acq>[^_]+))?"
    )
    match = re.search(pattern, filename)
    if match is None:
        return None
    return match.groupdict()


def _find_surface_run_paths(
    derivatives_root: Path,
    subject: str,
    task: str,
    run_label: Optional[str],
    acquisition: Optional[str],
    space: str,
) -> Optional[Dict[str, Path]]:
    func_dir = derivatives_root / f"sub-{subject}" / "func"
    surface_paths = {}
    for hemi in SURFACE_HEMIS:
        matches = []
        for path in func_dir.glob("*.func.gii"):
            name = path.name
            if f"sub-{subject}" not in name or f"task-{task}" not in name:
                continue
            if run_label and f"run-{run_label}" not in name:
                continue
            if acquisition and f"acq-{acquisition}" not in name:
                continue
            if f"hemi-{hemi}" not in name or f"space-{space}" not in name:
                continue
            if not name.endswith("_bold.func.gii"):
                continue
            matches.append(str(path))
        matches = sorted(matches)
        if len(matches) == 0:
            return None
        surface_paths[hemi] = Path(matches[0])
    return surface_paths


def _find_surface_mesh_paths(
    derivatives_root: Path, subject: str, space: str
) -> Optional[Dict[str, Path]]:
    anat_dir = derivatives_root / f"sub-{subject}" / "anat"
    mesh_paths = {}
    for hemi in SURFACE_HEMIS:
        candidates = [
            anat_dir
            / f"sub-{subject}_hemi-{hemi}_space-{space}_midthickness.surf.gii",
            anat_dir / f"sub-{subject}_hemi-{hemi}_space-{space}_pial.surf.gii",
            anat_dir / f"sub-{subject}_hemi-{hemi}_midthickness.surf.gii",
            anat_dir / f"sub-{subject}_hemi-{hemi}_pial.surf.gii",
        ]
        for candidate in candidates:
            if candidate.exists():
                mesh_paths[hemi] = candidate
                break
        else:
            return None
    return mesh_paths


def _infer_bootstrap_volume_space(
    derivatives_root: Path, task: str
) -> Optional[str]:
    spaces = []
    for path in derivatives_root.glob("sub-*/func/*_bold.nii*"):
        name = path.name
        if f"task-{task}" not in name:
            continue
        match = re.search(r"_space-([^_]+)", name)
        if match is not None:
            spaces.append(match.group(1))

    unique_spaces = set(spaces)
    for preferred_space in ["MNINonLinear", "MNI152NLin2009cAsym"]:
        if preferred_space in unique_spaces:
            return preferred_space
    if len(unique_spaces) > 0:
        return sorted(unique_spaces)[0]
    return None


def _split_first_level_bids_kwargs(
    model_kwargs: Dict,
) -> Tuple[Optional[Dict], Dict]:
    defaults = {
        "strategy": ("motion", "high_pass", "wm_csf"),
        "motion": "full",
        "scrub": 5,
        "fd_threshold": 0.2,
        "std_dvars_threshold": 3,
        "wm_csf": "basic",
        "global_signal": "basic",
        "compcor": "anat_combined",
        "n_compcor": "all",
        "ica_aroma": "full",
        "demean": True,
    }

    model_kwargs = dict(model_kwargs)
    if model_kwargs.get("confounds_strategy") is None:
        unknown_confounds_kwargs = {
            key: value
            for key, value in model_kwargs.items()
            if key.startswith("confounds_")
        }
        if len(unknown_confounds_kwargs) > 0:
            raise RuntimeError(
                "Unknown keyword arguments. Keyword arguments should start "
                f"with `confounds_` prefix: {unknown_confounds_kwargs}"
            )
        return None, model_kwargs

    kwargs_load_confounds = {}
    for key, default_value in defaults.items():
        confounds_key = f"confounds_{key}"
        if confounds_key in model_kwargs:
            kwargs_load_confounds[key] = model_kwargs.pop(confounds_key)
        else:
            kwargs_load_confounds[key] = default_value

    unknown_confounds_kwargs = {
        key: value
        for key, value in model_kwargs.items()
        if key.startswith("confounds_")
    }
    if len(unknown_confounds_kwargs) > 0:
        raise RuntimeError(
            "Unknown keyword arguments. Keyword arguments should start with "
            f"`confounds_` prefix: {unknown_confounds_kwargs}"
        )

    return kwargs_load_confounds, model_kwargs


def _append_scrub_outlier_regressors(
    confounds_df: pd.DataFrame,
    img_files,
    fd_threshold: Optional[float],
    std_dvars_threshold: Optional[float],
) -> pd.DataFrame:
    if fd_threshold is None and std_dvars_threshold is None:
        return confounds_df

    scrub_fd_threshold = np.inf if fd_threshold is None else fd_threshold
    scrub_std_dvars_threshold = (
        np.inf if std_dvars_threshold is None else std_dvars_threshold
    )
    scrub_confounds, masks = load_confounds(
        img_files,
        fd_threshold=scrub_fd_threshold,
        std_dvars_threshold=scrub_std_dvars_threshold,
        strategy=["scrub"],
    )
    if isinstance(scrub_confounds, list):
        scrub_confounds = scrub_confounds[0]
        masks = masks[0]

    confounds_df = confounds_df.copy()
    if masks is not None:
        outlier_indexes = set(scrub_confounds.index) - set(masks)
    else:
        outlier_indexes = {}
    for outlier_index in outlier_indexes:
        confounds_df[f"outlier_index_{outlier_index}"] = (
            np.arange(len(confounds_df)) == outlier_index
        ).astype(int)
    return confounds_df


def _collect_surface_bids_inputs(
    bids_data_folder: Path,
    derivatives_root: Path,
    task: str,
    subjects: Optional[List[str]],
    space: str,
    data_filter: Optional[List[Tuple[str, str]]],
    fd_threshold: Optional[float],
    std_dvars_threshold: Optional[float],
    model_kwargs: Dict,
):
    model_kwargs = dict(model_kwargs)
    kwargs_load_confounds, model_kwargs = _split_first_level_bids_kwargs(
        model_kwargs
    )
    filters = [("task", task), ("space", space), ("hemi", "L")]
    if data_filter is not None:
        filters.extend(data_filter)

    def _surface_func_files_for_subject(
        subject: Optional[str],
    ) -> List[Path]:
        return [
            Path(path)
            for path in get_bids_files(
                main_path=derivatives_root,
                modality_folder="func",
                file_tag="bold",
                file_type="func.gii",
                sub_label=subject,
                filters=filters,
            )
            if path.endswith("desc-preproc_bold.func.gii")
        ]

    if subjects is None:
        all_func_files = _surface_func_files_for_subject(None)
        if len(all_func_files) == 0:
            raise ValueError(
                f"No surface preprocessed BOLD runs found for task {task} "
                f"in space {space}."
            )
        subject_order = sorted(
            {
                entities["sub"]
                for path in all_func_files
                for entities in [_parse_bids_entity_map(path)]
                if "sub" in entities
            }
        )
    else:
        subject_order = subjects

    models = []
    models_run_imgs = []
    models_events = []
    models_confounds = []

    for subject in subject_order:
        func_files = _surface_func_files_for_subject(subject)
        if len(func_files) == 0:
            raise ValueError(
                f"No surface preprocessed BOLD runs found for subject "
                f"{subject} and task {task}."
            )

        subject_mesh_paths = _find_surface_mesh_paths(
            derivatives_root, subject, space
        )
        if subject_mesh_paths is None:
            raise ValueError(
                f"Could not find surface meshes for subject {subject} in "
                f"space {space}."
            )

        run_specs = []
        subject_events = []
        subject_confounds = []
        trs = []

        for left_file in func_files:
            right_file = Path(
                str(left_file).replace("_hemi-L_", "_hemi-R_")
            )
            if not right_file.exists():
                warnings.warn(
                    f"Right hemisphere file not found for {left_file.name}, "
                    "skipping."
                )
                continue

            file_entities = _parse_bids_entity_map(left_file)
            file_session = file_entities.get("ses")

            confounds_file = _find_matching_bids_file(
                left_file.parent,
                left_file,
                "_desc-confounds_timeseries.tsv",
            )
            if confounds_file is None or not confounds_file.exists():
                warnings.warn(
                    f"Confounds file not found for {left_file.name}, "
                    "skipping."
                )
                continue

            if file_session is None:
                events_dir = bids_data_folder / f"sub-{subject}" / "func"
            else:
                events_dir = (
                    bids_data_folder / f"sub-{subject}" / f"ses-{file_session}" / "func"
                )
            events_file = _find_matching_bids_file(
                events_dir, left_file, "_events.tsv"
            )
            if events_file is None:
                events_file = _find_matching_bids_file(
                    left_file.parent, left_file, "_events.tsv"
                )
            if events_file is None or not events_file.exists():
                warnings.warn(
                    f"Events file not found for {left_file.name}, skipping."
                )
                continue

            sidecar_path = Path(str(left_file).replace(".func.gii", ".json"))
            if not sidecar_path.exists():
                warnings.warn(
                    f"JSON sidecar not found for {left_file.name}, skipping."
                )
                continue

            with open(sidecar_path, "r") as f:
                sidecar = json.load(f)
            TR = sidecar.get("RepetitionTime")
            if TR is None:
                warnings.warn(
                    f"RepetitionTime missing for {left_file.name}, skipping."
                )
                continue

            subject_events.append(pd.read_csv(events_file, sep="\t"))
            img_file_pair = [str(left_file), str(right_file)]
            if kwargs_load_confounds is None:
                confounds_df = pd.read_csv(confounds_file, sep="\t")
            else:
                confounds_df, _ = load_confounds(
                    img_file_pair, **kwargs_load_confounds
                )
                if isinstance(confounds_df, list):
                    confounds_df = confounds_df[0]
                if confounds_df is None:
                    confounds_df = pd.DataFrame()
            if fd_threshold is not None or std_dvars_threshold is not None:
                confounds_df = _append_scrub_outlier_regressors(
                    confounds_df,
                    img_file_pair,
                    fd_threshold,
                    std_dvars_threshold,
                )
            run_specs.append(str(left_file))
            subject_confounds.append(confounds_df)
            trs.append(float(TR))

        if len(run_specs) == 0:
            raise ValueError(
                f"No usable surface preprocessed BOLD runs found for subject "
                f"{subject} and task {task}."
            )

        subject_tr = trs[0]
        if any(not np.isclose(tr, subject_tr) for tr in trs[1:]):
            raise ValueError(
                f"Surface runs for subject {subject} and task {task} do not "
                "share a consistent RepetitionTime."
            )

        surface_model_kwargs = dict(model_kwargs)
        surface_model_kwargs.setdefault("t_r", subject_tr)
        surface_model_kwargs.setdefault("subject_label", subject)
        surface_model_kwargs["minimize_memory"] = False
        model = FirstLevelModel(**surface_model_kwargs)
        models.append(model)
        models_run_imgs.append(run_specs)
        models_events.append(subject_events)
        models_confounds.append(subject_confounds)

    return models, models_run_imgs, models_events, models_confounds


def _load_surface_run_image(
    surface_run_paths: Dict[str, Path], mesh_paths: Dict[str, Path]
) -> SurfaceImage:
    return load_surface_image(surface_run_paths, mesh_paths)


def _load_run_image(run_spec, space: Optional[str], derivatives_root: Path):
    if is_surface_image(run_spec) or hasattr(run_spec, "get_fdata"):
        return run_spec

    run_path = Path(os.path.realpath(run_spec))
    if space is not None:
        entities = _parse_bids_entities(run_path)
        if entities is not None:
            surface_run_paths = _find_surface_run_paths(
                derivatives_root,
                entities["subject"],
                entities["task"],
                entities["run"],
                entities["acq"],
                space,
            )
            mesh_paths = _find_surface_mesh_paths(
                derivatives_root, entities["subject"], space
            )
            if surface_run_paths is not None and mesh_paths is not None:
                return _load_surface_run_image(surface_run_paths, mesh_paths)
    return load_img(run_path)


def _zero_run_contrast_vectors(run_contrast_vectors):
    return [np.zeros_like(vector) for vector in run_contrast_vectors]


def _run_subject_volume_first_level(
    subject,
    task,
    model,
    run_imgs,
    events,
    confounds,
    contrasts,
    orthogs,
    run_groups,
):
    if confounds is not None and not isinstance(confounds, list):
        confounds = [confounds]

    contrasts_folder = _get_contrast_folder(subject, task)
    contrasts_folder.mkdir(parents=True, exist_ok=True)

    run_img_grand = np.concatenate(
        [img.get_fdata() for img in run_imgs], axis=-1
    )
    run_img_grand = new_img_like(run_imgs[0], run_img_grand)
    design_matrices = []
    for run_i in range(1, len(run_imgs) + 1):
        events_i = events[run_i - 1]
        imgs_i = run_imgs[run_i - 1]
        frame_times = np.arange(imgs_i.shape[-1]) * model.t_r
        design_matrix = make_first_level_design_matrix(
            frame_times=frame_times,
            events=events_i,
            hrf_model=model.hrf_model,
            drift_model=model.drift_model,
            high_pass=model.high_pass,
            drift_order=model.drift_order,
            fir_delays=model.fir_delays,
            min_onset=model.min_onset,
        )

        if confounds is not None:
            design_matrix = pd.concat(
                [
                    design_matrix.reset_index(drop=True),
                    confounds[run_i - 1].reset_index(drop=True),
                ],
                axis=1,
            )

        design_matrix.columns = [
            f"run-{run_i:02d}_{col}" for col in design_matrix.columns
        ]
        design_matrices.append(design_matrix)

    design_matrix = pd.concat(design_matrices, axis=0)
    design_matrix = design_matrix.fillna(0)
    design_matrix_path = _get_design_matrix_path(subject, task)
    design_matrix_path.parent.mkdir(parents=True, exist_ok=True)
    design_matrix.to_csv(design_matrix_path, index=False)

    run_labels = [f"{run_i:02d}" for run_i in range(1, len(run_imgs) + 1)]
    custom_run_groups = _validate_run_groups(run_groups, len(run_imgs))
    run_group_summary = _build_run_group_summary(
        run_labels, orthogs, custom_run_groups
    )
    run_group_summary.to_csv(_get_run_group_info_path(subject, task), index=False)

    contrasts_ = contrasts.copy()
    for con_i, (contrast_name, contrast_expr) in enumerate(contrasts_):
        contrast_expr_by_run = {
            f"run-{run_i:02d}_{reg_name}": v / len(run_imgs)
            for reg_name, v in contrast_expr.items()
            for run_i in range(1, len(run_imgs) + 1)
        }
        for reg_name in contrast_expr_by_run.keys():
            if reg_name not in design_matrix.columns:
                raise ValueError(
                    f"For contrast '{contrast_name}', "
                    f"invalid regressor name: '{reg_name}'."
                )
        contrasts_[con_i] = (contrast_name, contrast_expr_by_run)

    for con_i, (contrast_name, contrast_expr) in enumerate(contrasts_):
        contrast_vector = [
            contrast_expr.get(col, 0) for col in design_matrix.columns
        ]
        _register_contrast(subject, task, contrast_name, contrast_vector)
        contrasts_[con_i] = (contrast_name, contrast_vector)

    model.fit(run_img_grand, design_matrices=design_matrix)
    model.residuals[0].to_filename(_get_residuals_path(subject, task))

    for contrast_name, contrast_vector in contrasts_:
        for run_i in range(1, len(run_imgs) + 1):
            run_contrast_vector = [
                (
                    v * len(run_imgs)
                    if label.startswith(f"run-{run_i:02d}_")
                    else 0
                )
                for label, v in zip(design_matrix.columns, contrast_vector)
            ]
            _compute_contrast_volume(
                subject,
                task,
                f"{run_i:02d}",
                model,
                contrast_name,
                run_contrast_vector,
            )

        _compute_contrast_volume(
            subject,
            task,
            "all",
            model,
            contrast_name,
            contrast_vector,
        )

        for group_label, group_runs in custom_run_groups.items():
            group_contrast_vector = [
                (
                    v * len(run_imgs) / len(group_runs)
                    if label.split("_")[0].replace("run-", "") in group_runs
                    else 0
                )
                for label, v in zip(design_matrix.columns, contrast_vector)
            ]
            _compute_contrast_volume(
                subject,
                task,
                group_label,
                model,
                contrast_name,
                group_contrast_vector,
            )

        if len(run_imgs) == 1:
            continue

        if "odd-even" in orthogs:
            for rem, run_label in zip([1, 0], ["odd", "even"]):
                orthog_contrast_expr = [
                    (
                        (
                            v
                            if int(label.split("_")[0].split("-")[1]) % 2
                            == rem
                            else 0
                        )
                        * (len(run_imgs))
                        / (
                            len(run_imgs) // 2
                            if rem == 0
                            else len(run_imgs) - len(run_imgs) // 2
                        )
                    )
                    for label, v in zip(design_matrix.columns, contrast_vector)
                ]
                _compute_contrast_volume(
                    subject,
                    task,
                    run_label,
                    model,
                    contrast_name,
                    orthog_contrast_expr,
                )

        if "all-but-one" in orthogs:
            for run_i in range(1, len(run_imgs) + 1):
                orthog_contrast_expr = [
                    (
                        v * len(run_imgs) / (len(run_imgs) - 1)
                        if not label.startswith(f"run-{run_i:02d}_")
                        else 0
                    )
                    for label, v in zip(design_matrix.columns, contrast_vector)
                ]
                _compute_contrast_volume(
                    subject,
                    task,
                    f"orth{run_i:02d}",
                    model,
                    contrast_name,
                    orthog_contrast_expr,
                )


def _run_subject_surface_first_level(
    subject,
    task,
    model,
    run_specs,
    derivatives_root,
    surface_space,
    events,
    confounds,
    contrasts,
    orthogs,
    run_groups,
):
    run_imgs = [
        _load_run_image(run_img, surface_space, derivatives_root)
        for run_img in run_specs
    ]

    if confounds is not None and not isinstance(confounds, list):
        confounds = [confounds]

    contrasts_folder = _get_contrast_folder(subject, task)
    contrasts_folder.mkdir(parents=True, exist_ok=True)

    run_design_matrices = []
    prefixed_design_matrices = []
    for run_i, (events_i, imgs_i) in enumerate(
        zip(events, run_imgs, strict=False), start=1
    ):
        frame_times = np.arange(imgs_i.shape[-1]) * model.t_r
        design_matrix = make_first_level_design_matrix(
            frame_times=frame_times,
            events=events_i,
            hrf_model=model.hrf_model,
            drift_model=model.drift_model,
            high_pass=model.high_pass,
            drift_order=model.drift_order,
            fir_delays=model.fir_delays,
            min_onset=model.min_onset,
        )

        if confounds is not None:
            design_matrix = pd.concat(
                [
                    design_matrix.reset_index(drop=True),
                    confounds[run_i - 1].reset_index(drop=True),
                ],
                axis=1,
            )

        run_design_matrices.append(design_matrix)

        design_matrix_prefixed = design_matrix.copy()
        design_matrix_prefixed.columns = [
            f"run-{run_i:02d}_{col}" for col in design_matrix.columns
        ]
        prefixed_design_matrices.append(design_matrix_prefixed)

    design_matrix = pd.concat(prefixed_design_matrices, axis=0).fillna(0)
    design_matrix_path = _get_design_matrix_path(subject, task)
    design_matrix_path.parent.mkdir(parents=True, exist_ok=True)
    design_matrix.to_csv(design_matrix_path, index=False)

    run_labels = [f"{run_i:02d}" for run_i in range(1, len(run_imgs) + 1)]
    custom_run_groups = _validate_run_groups(run_groups, len(run_imgs))
    run_group_summary = _build_run_group_summary(
        run_labels, orthogs, custom_run_groups
    )
    run_group_summary.to_csv(_get_run_group_info_path(subject, task), index=False)

    contrasts_ = []
    for contrast_name, contrast_expr in contrasts.copy():
        contrast_expr_by_run = {
            f"run-{run_i:02d}_{reg_name}": v / len(run_imgs)
            for reg_name, v in contrast_expr.items()
            for run_i in range(1, len(run_imgs) + 1)
        }
        for reg_name in contrast_expr_by_run.keys():
            if reg_name not in design_matrix.columns:
                raise ValueError(
                    f"For contrast '{contrast_name}', "
                    f"invalid regressor name: '{reg_name}'."
                )
        contrast_vector = [
            contrast_expr_by_run.get(col, 0) for col in design_matrix.columns
        ]
        _register_contrast(subject, task, contrast_name, contrast_vector)

        run_contrast_vectors = []
        for run_design_matrix in run_design_matrices:
            run_contrast_vectors.append(
                np.array(
                    [
                        contrast_expr.get(column, 0)
                        for column in run_design_matrix.columns
                    ],
                    dtype=float,
                )
            )
        contrasts_.append((contrast_name, run_contrast_vectors))

    model.fit(run_imgs, design_matrices=run_design_matrices)
    _save_image(
        model.residuals[0],
        _get_residuals_path(subject, task),
        {
            hemi: _get_surface_residuals_path(subject, task, hemi)
            for hemi in SURFACE_HEMIS
        },
    )

    for contrast_name, run_contrast_vectors in contrasts_:
        for run_i in range(1, len(run_imgs) + 1):
            single_run_contrast = _zero_run_contrast_vectors(
                run_contrast_vectors
            )
            single_run_contrast[run_i - 1] = run_contrast_vectors[run_i - 1]
            _compute_contrast_surface(
                subject,
                task,
                f"{run_i:02d}",
                model,
                contrast_name,
                single_run_contrast,
            )

        _compute_contrast_surface(
            subject,
            task,
            "all",
            model,
            contrast_name,
            run_contrast_vectors,
        )

        for group_label, group_runs in custom_run_groups.items():
            group_contrast = _zero_run_contrast_vectors(run_contrast_vectors)
            for run_label in group_runs:
                group_index = int(run_label) - 1
                group_contrast[group_index] = run_contrast_vectors[group_index]
            _compute_contrast_surface(
                subject,
                task,
                group_label,
                model,
                contrast_name,
                group_contrast,
            )

        if len(run_imgs) == 1:
            continue

        if "odd-even" in orthogs:
            for rem, run_label in zip([1, 0], ["odd", "even"]):
                orthog_contrast = _zero_run_contrast_vectors(
                    run_contrast_vectors
                )
                for run_i in range(1, len(run_imgs) + 1):
                    if run_i % 2 == rem:
                        orthog_contrast[run_i - 1] = run_contrast_vectors[
                            run_i - 1
                        ]
                _compute_contrast_surface(
                    subject,
                    task,
                    run_label,
                    model,
                    contrast_name,
                    orthog_contrast,
                )

        if "all-but-one" in orthogs:
            for run_i in range(1, len(run_imgs) + 1):
                orthog_contrast = _zero_run_contrast_vectors(
                    run_contrast_vectors
                )
                for other_run_i in range(1, len(run_imgs) + 1):
                    if other_run_i != run_i:
                        orthog_contrast[other_run_i - 1] = (
                            run_contrast_vectors[other_run_i - 1]
                        )
                _compute_contrast_surface(
                    subject,
                    task,
                    f"orth{run_i:02d}",
                    model,
                    contrast_name,
                    orthog_contrast,
                )


[docs] def run_first_level( task: str, subjects: Optional[List[str]] = None, space: Optional[str] = None, data_filter: Optional[List[Tuple[str, str]]] = [], contrasts: Optional[List[Tuple[str, Dict[str, float]]]] = [], orthogs: Optional[List[str]] = ["all-but-one", "odd-even"], run_groups: Optional[Dict[str, List[int]]] = None, fd_threshold: Optional[float] = None, std_dvars_threshold: Optional[float] = None, **kwargs, ): """ Run first-level analysis for a list of subjects. :param task: The task label. :type task: str :param subjects: List of subject labels. If None, all subjects are included. :type subjects: Optional[List[str]] :param space: The space name of the data. If None, the data is assumed to be in the native space. :type space: Optional[str] :param data_filter: Additional data filter, e.g. the resolution associated with the space. See Nilearn `get_bids_files` documentation for more information. :type data_filter: Optional[List[Tuple[str, str]]] :param contrasts: List of contrast definitions. Each contrast is a tuple of the contrast name and the contrast expression. The contrast expression is defined by a dictionary of regressor names and their weights. :type contrasts: Optional[List[Tuple[str, Dict[str, float]]] :param orthogs: List of orthogonalization strategies. For each group, contrast images are also generated for corresponding run labels. Supported strategies are 'all-but-one' and 'odd-even'. Default is both. :type orthogs: Optional[List[str]] :param run_groups: Optional custom run groups to compute alongside the built-in run labels. Keys are group names and values are 1-indexed run ids. :type run_groups: Optional[Dict[str, List[int]]] :param fd_threshold: Threshold for framewise displacement (FD) to be used for confound generation. :type fd_threshold: Optional[float] :param std_dvars_threshold: Threshold for standard deviation of DVARS to be used for confound generation. :type std_dvars_threshold: Optional[float] :param kwargs: Additional keyword arguments for the first-level analysis. See Nilearn `first_level_from_bids` documentation for more information. :type kwargs: Dict """ try: bids_data_folder = get_bids_data_folder() except (ValueError, RuntimeError): raise ValueError( "The output directory is not set. The default output directory " "cannot be inferred from the BIDS data folder." ) try: bids_data_folder = get_bids_data_folder() derivatives_folder = get_bids_preprocessed_folder_relative() except (ValueError, RuntimeError): bids_data_folder = get_bids_preprocessed_folder() derivatives_folder = "." bids_data_folder = Path(bids_data_folder) derivatives_root = ( bids_data_folder if derivatives_folder == "." else bids_data_folder / derivatives_folder ) surface_requested = _uses_surface_space(derivatives_root, task, space) bootstrap_space = space if surface_requested: bootstrap_space = _infer_bootstrap_volume_space( derivatives_root, task ) if bootstrap_space is None: ( models, models_run_imgs, models_events, models_confounds, ) = _collect_surface_bids_inputs( bids_data_folder, derivatives_root, task, subjects, space, data_filter, fd_threshold, std_dvars_threshold, kwargs, ) else: ( models, models_run_imgs, models_events, models_confounds, ) = first_level_from_bids( bids_data_folder, task, sub_labels=subjects, space_label=bootstrap_space, derivatives_folder=derivatives_folder, img_filters=data_filter, minimize_memory=False, **kwargs, ) else: ( models, models_run_imgs, models_events, models_confounds, ) = first_level_from_bids( bids_data_folder, task, sub_labels=subjects, space_label=bootstrap_space, derivatives_folder=derivatives_folder, img_filters=data_filter, minimize_memory=False, **kwargs, ) if ( bootstrap_space is not None or not surface_requested ) and (fd_threshold is not None or std_dvars_threshold is not None): if fd_threshold is None: fd_threshold = np.inf if std_dvars_threshold is None: std_dvars_threshold = np.inf for sub_i in range(len(models)): imgs = models_run_imgs[sub_i] confounds = models_confounds[sub_i] confounds, masks = load_confounds( imgs, fd_threshold=fd_threshold, std_dvars_threshold=std_dvars_threshold, strategy=["scrub"], ) if not isinstance(confounds, list): models_confounds[sub_i] = [confounds] masks = [masks] n_runs = len(models_confounds[sub_i]) for run_i in range(n_runs): confounds_i = models_confounds[sub_i][run_i] masks_i = masks[run_i] if masks_i is not None: outlier_indexes = set(confounds_i.index) - set(masks_i) else: outlier_indexes = {} for outlier_index in outlier_indexes: models_confounds[sub_i][run_i][ f"outlier_index_{outlier_index}" ] = (np.arange(len(confounds_i)) == outlier_index).astype( int ) for sub_i in range(len(models)): model = models[sub_i] subject = model.subject_label run_specs = models_run_imgs[sub_i] events = models_events[sub_i] confounds = models_confounds[sub_i] if surface_requested: _run_subject_surface_first_level( subject, task, model, run_specs, derivatives_root, space, events, confounds, contrasts, orthogs, run_groups, ) else: run_imgs = [ load_img(os.path.realpath(img)) for img in run_specs ] _run_subject_volume_first_level( subject, task, model, run_imgs, events, confounds, contrasts, orthogs, run_groups, )