Source code for funROI.datasets.hcp

import warnings
import os
import boto3
import pandas as pd
from ..utils import ensure_paths
import json
import shutil
import pathlib
from typing import List, Union


def _get_events(ev_folder_path, events):
    events_df = pd.DataFrame(
        columns=["onset", "duration", "trial_type", "amplitude"]
    )
    for condition in events:
        ev_file = f"{ev_folder_path}/{condition}.txt"
        ev_data = pd.read_csv(
            ev_file,
            sep="\t",
            header=None,
            names=["onset", "duration", "amplitude"],
        )
        ev_data["trial_type"] = condition
        events_df = pd.concat([events_df, ev_data], ignore_index=True)
    events_df = events_df.sort_values("onset").reset_index(drop=True)
    events_df = events_df[["trial_type", "onset", "duration"]]
    events_df["trial_type"] = events_df["trial_type"].str.replace(
        r"[^a-zA-Z0-9]", ""
    )
    return events_df


def _list_s3_objects(s3_client, bucket_name, prefix):
    bucket = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    return [obj["Key"] for obj in bucket.get("Contents", [])]


def _download_file(s3_client, bucket_name, s3_key, local_path):
    try:
        os.makedirs(os.path.dirname(local_path), exist_ok=True)
        s3_client.download_file(bucket_name, s3_key, local_path)
    except Exception as e:
        print(e)
        print(f"Missing or failed: {s3_key}")


def _download_selected(parent_dir, s3_client, subject, task):
    patterns = [
        f"MNINonLinear/Results/tfMRI_{task}_LR",
        f"MNINonLinear/Results/tfMRI_{task}_RL",
    ]

    for pattern in patterns:
        s3_path = f"HCP_1200/{subject}/{pattern}"
        s3_objects = _list_s3_objects(s3_client, "hcp-openaccess", s3_path)
        for s3_object in s3_objects:
            _download_file(
                s3_client,
                "hcp-openaccess",
                s3_object,
                parent_dir / s3_object,
            )


@ensure_paths("data_dir", "bids_dir")
def _convert_to_bids(data_dir, bids_dir, subject, task):
    runs = (data_dir / subject / "MNINonLinear" / "Results").iterdir()
    run_i = 1
    for run_folder in runs:
        if f"tfMRI_{task}" not in run_folder.name:
            continue
        run_task = run_folder.name.split("_")[1]
        run_filename = run_folder.name.split("_", 1)[1]
        run_suffix = run_folder.name.split("_")[-1]

        bids_folder = bids_dir / f"sub-{subject}" / "func"
        bids_folder.mkdir(parents=True, exist_ok=True)

        bids_prefix_no_space = (
            f"sub-{subject}_task-{run_task}_run-{run_i}_acq-{run_suffix}"
        )
        bids_prefix = bids_prefix_no_space + "_space-MNINonLinear"

        # Data files
        shutil.copy(
            run_folder / "brainmask_fs.2.nii.gz",
            bids_folder / f"{bids_prefix}_desc-brain_mask.nii.gz",
        )
        shutil.copy(
            run_folder / f"tfMRI_{run_filename}.nii.gz",
            bids_folder / f"{bids_prefix}_desc-preproc_bold.nii.gz",
        )

        # BOLD configuration
        with open(bids_folder / f"{bids_prefix}_bold.json", "w") as f:
            json.dump(
                {
                    "RepetitionTime": 0.72,
                    "EchoTime": 0.0331,
                    "EffectiveEchoSpacing": 0.00058,
                    "MagneticFieldStrength": 3.0,
                    "Manufacturer": "Siemens",
                    "ManufacturerModelName": "Skyra",
                    "PhaseEncodingDirection": (
                        "i-" if run_suffix == "LR" else "i"
                    ),
                    "TaskName": task,
                },
                f,
                indent=4,
            )

        # Confounds file
        with open(run_folder / "Movement_Regressors.txt", "r") as f:
            data = [[float(x) for x in line.split()] for line in f]
        pd.DataFrame(
            data,
            columns=[
                "trans_x",
                "trans_y",
                "trans_z",
                "rot_x",
                "rot_y",
                "rot_z",
                "trans_dx",
                "trans_dy",
                "trans_dz",
                "rot_dx",
                "rot_dy",
                "rot_dz",
            ],
        ).to_csv(
            bids_folder
            / f"{bids_prefix_no_space}_desc-confounds_timeseries.tsv",
            sep="\t",
            index=False,
        )

        # Blocks, as specified in https://www.humanconnectome.org/hcp-protocols-ya-task-fmri
        if task == "LANGUAGE":
            events = ["math", "story"]
        elif task == "MOTOR":
            events = ["cue", "t", "lf", "rf", "lh", "rh"]
        elif task == "WM":
            events = [
                "0bk_body",
                "0bk_faces",
                "0bk_places",
                "0bk_tools",
                "2bk_body",
                "2bk_faces",
                "2bk_places",
                "2bk_tools",
            ]
        elif task == "SOCIAL":
            events = ["mental", "rnd"]

        events_df = _get_events(run_folder / "EVs", events=events)
        events_df.to_csv(
            bids_folder / f"{bids_prefix_no_space}_events.tsv",
            sep="\t",
            index=False,
        )

        run_i += 1
        shutil.rmtree(run_folder)


[docs] @ensure_paths("data_dir") def fetch_data( data_dir: Union[str, pathlib.Path], task: str, subjects: List[str] ) -> None: """ Fetches the HCP dataset for a given task and subjects, and converts it to BIDS format. :param data_dir: Path to the directory where the data will be stored. :type data_dir: Union[str, pathlib.Path] :param task: The task to fetch data for. Options are "LANGUAGE", "MOTOR", "WM", and "SOCIAL". :type task: str :param subjects: List of subject IDs to fetch data for (e.g., ["100307", "100408"]). :type subjects: List[str] """ task = task.upper() if task not in ["LANGUAGE", "MOTOR", "WM", "SOCIAL"]: raise ValueError( "Unsupported task. Choose from LANGUAGE, MOTOR, WM, SOCIAL" ) data_dir = data_dir.absolute() bids_dir = data_dir / "bids" data_dir.mkdir(parents=True, exist_ok=True) bids_dir.mkdir(parents=True, exist_ok=True) s3_client = boto3.client("s3") for subject in subjects: try: _download_selected(data_dir, s3_client, subject, task) _convert_to_bids(data_dir / "HCP_1200", bids_dir, subject, task) except Exception as e: print(f"Error processing {subject}: {e}")
# shutil.rmtree(data_dir / "HCP_1200")