Source code for fileio

from __future__ import print_function

import os
import re
import sys
import tempfile

import nibabel as nib
import numpy as np
import pandas as pd

try:  # run as a package if installed
    from pcntoolkit import configs
except ImportError:
    pass

    path = os.path.abspath(os.path.dirname(__file__))
    path = os.path.dirname(path)  # parent directory
    if path not in sys.path:
        sys.path.append(path)
    del path
    import configs

CIFTI_MAPPINGS = (
    "dconn",
    "dtseries",
    "pconn",
    "ptseries",
    "dscalar",
    "dlabel",
    "pscalar",
    "pdconn",
    "dpconn",
    "pconnseries",
    "pconnscalar",
)

CIFTI_VOL_ATLAS = "Atlas_ROIs.2.nii.gz"

PICKLE_PROTOCOL = configs.PICKLE_PROTOCOL

# ------------------------
# general utility routines
# ------------------------


[docs] def predictive_interval(s2_forward, cov_forward, multiplicator): """ Calculates a predictive interval for the forward model """ # calculates a predictive interval PI = np.zeros(len(cov_forward)) for i, xdot in enumerate(cov_forward): s = np.sqrt(s2_forward[i]) PI[i] = multiplicator * s return PI
[docs] def create_mask(data_array, mask, verbose=False): """ Create a mask from a data array or a nifti file Basic usage:: create_mask(data_array, mask, verbose) :param data_array: numpy array containing the data to write out :param mask: nifti image containing a mask for the image :param verbose: verbose output """ # create a (volumetric) mask either from an input nifti or the nifti itself if mask is not None: if verbose: print("Loading ROI mask ...") maskvol = load_nifti(mask, vol=True) maskvol = maskvol != 0 else: if len(data_array.shape) < 4: dim = data_array.shape[0:3] + (1,) else: dim = data_array.shape[0:3] + (data_array.shape[3],) if verbose: print("Generating mask automatically ...") if dim[3] == 1: maskvol = data_array[:, :, :] != 0 else: maskvol = data_array[:, :, :, 0] != 0 return maskvol
[docs] def vol2vec(dat, mask, verbose=False): """ Vectorise a 3d image Basic usage:: vol2vec(dat, mask, verbose) :param dat: numpy array containing the data to write out :param mask: nifti image containing a mask for the image :param verbose: verbose output """ # vectorise a 3d image if len(dat.shape) < 4: dim = dat.shape[0:3] + (1,) else: dim = dat.shape[0:3] + (dat.shape[3],) # mask = create_mask(dat, mask=mask, verbose=verbose) if mask is None: mask = create_mask(dat, mask=mask, verbose=verbose) # mask the image maskid = np.where(mask.ravel())[0] dat = np.reshape(dat, (np.prod(dim[0:3]), dim[3])) dat = dat[maskid, :] # convert to 1-d array if the file only contains one volume if dim[3] == 1: dat = dat.ravel() return dat
[docs] def file_type(filename): """ Determine the file type of a file Basic usage:: file_type(filename) :param filename: name of the file to check """ # routine to determine filetype if filename.endswith((".dtseries.nii", ".dscalar.nii", ".dlabel.nii")): ftype = "cifti" elif filename.endswith((".nii.gz", ".nii", ".img", ".hdr")): ftype = "nifti" elif filename.endswith((".txt", ".csv", ".tsv", ".asc")): ftype = "text" elif filename.endswith((".pkl")): ftype = "binary" else: raise ValueError("I don't know what to do with " + filename) return ftype
[docs] def file_extension(filename): """ Determine the file extension of a file (e.g. .nii.gz) Basic usage:: file_extension(filename) :param filename: name of the file to check """ # routine to get the full file extension (e.g. .nii.gz, not just .gz) parts = filename.split(os.extsep) if parts[-1] == "gz": if parts[-2] == "nii" or parts[-2] == "img" or parts[-2] == "hdr": ext = parts[-2] + "." + parts[-1] else: ext = parts[-1] elif parts[-1] == "nii": if parts[-2] in CIFTI_MAPPINGS: ext = parts[-2] + "." + parts[-1] else: ext = parts[-1] else: ext = parts[-1] ext = "." + ext return ext
[docs] def file_stem(filename): """ Determine the file stem of a file (e.g. /path/to/file.nii.gz -> file) Basic usage:: file_stem(filename) :param filename: name of the file to check """ idx = filename.find(file_extension(filename)) stm = filename[0:idx] return stm
# -------------- # nifti routines # --------------
[docs] def load_nifti(datafile, mask=None, vol=False, verbose=False): """ Load a nifti file into a numpy array Basic usage:: load_nifti(datafile, mask, vol, verbose) :param datafile: name of the file to load :param mask: nifti image containing a mask for the image :param vol: whether to load the image as a volume :param verbose: verbose output """ if verbose: print("Loading nifti: " + datafile + " ...") img = nib.load(datafile) dat = img.get_fdata() if mask is not None: mask = load_nifti(mask, vol=True) if not vol: dat = vol2vec(dat, mask) return dat
[docs] def save_nifti(data, filename, examplenii, mask, dtype=None): """ Write output to nifti Basic usage:: save_nifti(data, filename mask, dtype) :param data: numpy array containing the data to write out :param filename: where to store it :param examplenii: nifti to copy the geometry and data type from :mask: nifti image containing a mask for the image :param dtype: data type for the output image (if different from the image) """ # load mask if isinstance(mask, str): mask = load_nifti(mask, vol=True) mask = mask != 0 # load example image ex_img = nib.load(examplenii) ex_img.shape dim = ex_img.shape[0:3] if len(data.shape) < 2: nvol = 1 data = data[:, np.newaxis] else: nvol = int(data.shape[1]) # write data array_data = np.zeros((np.prod(dim), nvol)) array_data[mask.flatten(), :] = data array_data = np.reshape(array_data, dim + (nvol,)) hdr = ex_img.header if dtype is not None: hdr.set_data_dtype(dtype) array_data = array_data.astype(dtype) array_img = nib.Nifti1Image(array_data, ex_img.affine, hdr) nib.save(array_img, filename)
# -------------- # cifti routines # --------------
[docs] def load_cifti(filename, vol=False, mask=None, rmtmp=True): """ Load a cifti file into a numpy array Basic usage:: load_cifti(filename, vol, mask, rmtmp) :param filename: name of the file to load :param vol: whether to load the image as a volume :param mask: nifti image containing a mask for the image :param rmtmp: whether to remove temporary files """ # parse the name dnam, fnam = os.path.split(filename) fpref = file_stem(fnam) outstem = os.path.join(tempfile.gettempdir(), str(os.getpid()) + "-" + fpref) # extract surface data from the cifti file print("Extracting cifti surface data to ", outstem, "-*.func.gii", sep="") giinamel = outstem + "-left.func.gii" giinamer = outstem + "-right.func.gii" os.system( "wb_command -cifti-separate " + filename + " COLUMN -metric CORTEX_LEFT " + giinamel ) os.system( "wb_command -cifti-separate " + filename + " COLUMN -metric CORTEX_RIGHT " + giinamer ) # load the surface data giil = nib.load(giinamel) giir = nib.load(giinamer) Nimg = len(giil.darrays) Nvert = len(giil.darrays[0].data) if Nimg == 1: out = np.concatenate((giil.darrays[0].data, giir.darrays[0].data), axis=0) else: Gl = np.zeros((Nvert, Nimg)) Gr = np.zeros((Nvert, Nimg)) for i in range(0, Nimg): Gl[:, i] = giil.darrays[i].data Gr[:, i] = giir.darrays[i].data out = np.concatenate((Gl, Gr), axis=0) if rmtmp: # clean up temporary files os.remove(giinamel) os.remove(giinamer) if vol: niiname = outstem + "-vol.nii" print("Extracting cifti volume data to ", niiname, sep="") os.system( "wb_command -cifti-separate " + filename + " COLUMN -volume-all " + niiname ) vol = load_nifti(niiname, vol=True) volmask = create_mask(vol) out = np.concatenate((out, vol2vec(vol, volmask)), axis=0) if rmtmp: os.remove(niiname) return out
[docs] def save_cifti(data, filename, example, mask=None, vol=True, volatlas=None): """ Save a cifti file from a numpy array Basic usage:: save_cifti(data, filename, example, mask, vol, volatlas) :param data: numpy array containing the data to write out :param filename: where to store it :param example: example file to copy the geometry from :param mask: nifti image containing a mask for the image :param vol: whether to load the image as a volume :param volatlas: atlas to use for the volume """ # do some sanity checks if data.dtype == "float32" or data.dtype == "float" or data.dtype == "float64": data = data.astype("float32") # force 32 bit output dtype = "NIFTI_TYPE_FLOAT32" else: raise ValueError("Only float data types currently handled") if len(data.shape) == 1: Nimg = 1 data = data[:, np.newaxis] else: Nimg = data.shape[1] # get the base filename dnam, fnam = os.path.split(filename) fstem = file_stem(fnam) # Split the template estem = os.path.join(tempfile.gettempdir(), str(os.getpid()) + "-" + fstem) giiexnamel = estem + "-left.func.gii" giiexnamer = estem + "-right.func.gii" os.system( "wb_command -cifti-separate " + example + " COLUMN -metric CORTEX_LEFT " + giiexnamel ) os.system( "wb_command -cifti-separate " + example + " COLUMN -metric CORTEX_RIGHT " + giiexnamer ) # write left hemisphere giiexl = nib.load(giiexnamel) Nvertl = len(giiexl.darrays[0].data) garraysl = [] for i in range(0, Nimg): garraysl.append( nib.gifti.gifti.GiftiDataArray(data=data[0:Nvertl, i], datatype=dtype) ) giil = nib.gifti.gifti.GiftiImage(darrays=garraysl) fnamel = fstem + "-left.func.gii" nib.save(giil, fnamel) # write right hemisphere giiexr = nib.load(giiexnamer) Nvertr = len(giiexr.darrays[0].data) garraysr = [] for i in range(0, Nimg): garraysr.append( nib.gifti.gifti.GiftiDataArray( data=data[Nvertl : Nvertl + Nvertr, i], datatype=dtype ) ) giir = nib.gifti.gifti.GiftiImage(darrays=garraysr) fnamer = fstem + "-right.func.gii" nib.save(giir, fnamer) tmpfiles = [fnamer, fnamel, giiexnamel, giiexnamer] # process volumetric data if vol: niiexname = estem + "-vol.nii" os.system( "wb_command -cifti-separate " + example + " COLUMN -volume-all " + niiexname ) niivol = load_nifti(niiexname, vol=True) if mask is None: mask = create_mask(niivol) if volatlas is None: volatlas = CIFTI_VOL_ATLAS fnamev = fstem + "-vol.nii" save_nifti(data[Nvertr + Nvertl :, :], fnamev, niiexname, mask) tmpfiles.extend([fnamev, niiexname]) # write cifti fname = fstem + ".dtseries.nii" os.system( "wb_command -cifti-create-dense-timeseries " + fname + " -volume " + fnamev + " " + volatlas + " -left-metric " + fnamel + " -right-metric " + fnamer ) # clean up for f in tmpfiles: os.remove(f)
# -------------- # ascii routines # --------------
[docs] def load_pd(filename): """ Load a csv file into a pandas dataframe Basic usage:: load_pd(filename) :param filename: name of the file to load """ # based on pandas x = pd.read_csv(filename, sep=" ", header=None) return x
[docs] def save_pd(data, filename): """ Save a pandas dataframe to a csv file Basic usage:: save_pd(data, filename) :param data: pandas dataframe containing the data to write out :param filename: where to store it """ # based on pandas data.to_csv(filename, index=None, header=None, sep=" ", na_rep="NaN")
[docs] def load_ascii(filename): """ Load an ascii file into a numpy array Basic usage:: load_ascii(filename) :param filename: name of the file to load """ # based on pandas x = np.loadtxt(filename) return x
[docs] def save_ascii(data, filename): """ Save a numpy array to an ascii file Basic usage:: save_ascii(data, filename) :param data: numpy array containing the data to write out :param filename: where to store it """ # based on pandas np.savetxt(filename, data)
# ---------------- # generic routines # ----------------
[docs] def save(data, filename, example=None, mask=None, text=False, dtype=None): """ Save a numpy array to a file Basic usage:: save(data, filename, example, mask, text, dtype) :param data: numpy array containing the data to write out :param filename: where to store it :param example: example file to copy the geometry from :param mask: nifti image containing a mask for the image :param text: whether to write out a text file :param dtype: data type for the output image (if different from the image) """ if file_type(filename) == "cifti": save_cifti(data.T, filename, example, vol=True) elif file_type(filename) == "nifti": save_nifti(data.T, filename, example, mask, dtype=dtype) elif text or file_type(filename) == "text": save_ascii(data, filename) elif file_type(filename) == "binary": data = pd.DataFrame(data) data.to_pickle(filename, protocol=PICKLE_PROTOCOL)
[docs] def load(filename, mask=None, text=False, vol=True): """ Load a numpy array from a file Basic usage:: load(filename, mask, text, vol) :param filename: name of the file to load :param mask: nifti image containing a mask for the image :param text: whether to write out a text file :param vol: whether to load the image as a volume """ if file_type(filename) == "cifti": x = load_cifti(filename, vol=vol) elif file_type(filename) == "nifti": x = load_nifti(filename, mask, vol=vol) elif text or file_type(filename) == "text": x = load_ascii(filename) elif file_type(filename) == "binary": x = pd.read_pickle(filename) x = x.to_numpy() return x
# ------------------- # sorting routines for batched in normative parallel # -------------------
[docs] def tryint(s): """ Try to convert a string to an integer Basic usage:: tryint(s) :param s: string to convert """ try: return int(s) except ValueError: return s
[docs] def alphanum_key(s): """ Turn a string into a list of numbers Basic usage:: alphanum_key(s) :param s: string to convert """ return [tryint(c) for c in re.split("([0-9]+)", s)]
[docs] def sort_nicely(l): """ Sort a list of strings in a natural way Basic usage:: sort_nicely(l) :param l: list of strings to sort """ return sorted(l, key=alphanum_key)