Source code for normative_parallel

#!/opt/conda/bin/python

# -----------------------------------------------------------------------------
# Run parallel normative modelling.
# All processing takes place in the processing directory (processing_dir)
# All inputs should be text files or binaries and space seperated
#
# It is possible to run these functions using...
#
# * k-fold cross-validation
# * estimating a training dataset then applying to a second test dataset
#
# First,the data is split for parallel processing.
# Second, the splits are submitted to the cluster.
# Third, the output is collected and combined.
#
# witten by (primarily) T Wolfers, (adaptated) SM Kia, H Huijsdens, L Parks,
# S Rutherford, AF Marquand
# -----------------------------------------------------------------------------

from __future__ import print_function
from __future__ import division

import os
import sys
import glob
import shutil
import pickle
import fileinput
import time
import numpy as np
import pandas as pd
from subprocess import call, check_output


try:
    import pcntoolkit as ptk
    import pcntoolkit.dataio.fileio as fileio
    from pcntoolkit import configs
    from pcntoolkit.util.utils import yes_or_no
    ptkpath = ptk.__path__[0]
except ImportError:
    pass
    ptkpath = os.path.abspath(os.path.dirname(__file__))
    if ptkpath not in sys.path:
        sys.path.append(ptkpath)
    import dataio.fileio as fileio
    import configs
    from util.utils import yes_or_no


PICKLE_PROTOCOL = configs.PICKLE_PROTOCOL


[docs] def execute_nm(processing_dir, python_path, job_name, covfile_path, respfile_path, batch_size, memory, duration, normative_path=None, func='estimate', interactive=False, **kwargs): ''' Execute parallel normative models This function is a mother function that executes all parallel normative modelling routines. Different specifications are possible using the sub- functions. Basic usage:: execute_nm(processing_dir, python_path, job_name, covfile_path, respfile_path, batch_size, memory, duration) :param processing_dir: Full path to the processing dir :param python_path: Full path to the python distribution :param normative_path: Full path to the normative.py. If None (default) then it will automatically retrieves the path from the installed packeage. :param job_name: Name for the bash script that is the output of this function :param covfile_path: Full path to a .txt file that contains all covariats (subjects x covariates) for the responsefile :param respfile_path: Full path to a .txt that contains all features (subjects x features) :param batch_size: Number of features in each batch :param memory: Memory requirements written as string for example 4gb or 500mb :param duation: The approximate duration of the job, a string with HH:MM:SS for example 01:01:01 :param cv_folds: Number of cross validations :param testcovfile_path: Full path to a .txt file that contains all covariates (subjects x covariates) for the test response file :param testrespfile_path: Full path to a .txt file that contains all test features :param log_path: Path for saving log files :param binary: If True uses binary format for response file otherwise it is text :param interactive: If False (default) the user should manually rerun the failed jobs or collect the results. If 'auto' the job status are checked until all jobs are completed then the failed jobs are rerun and the results are automaticallu collectted. Using 'query' is similar to 'auto' unless it asks for user verification thius is immune to endless loop in the case of bugs in the code. written by (primarily) T Wolfers, (adapted) SM Kia The documentation is adapated by S Rutherford. ''' if normative_path is None: normative_path = ptkpath + '/normative.py' cv_folds = kwargs.get('cv_folds', None) testcovfile_path = kwargs.get('testcovfile_path', None) testrespfile_path = kwargs.get('testrespfile_path', None) outputsuffix = kwargs.get('outputsuffix', 'estimate') cluster_spec = kwargs.pop('cluster_spec', 'torque') log_path = kwargs.get('log_path', None) binary = kwargs.pop('binary', False) split_nm(processing_dir, respfile_path, batch_size, binary, **kwargs) batch_dir = glob.glob(processing_dir + 'batch_*') # print(batch_dir) number_of_batches = len(batch_dir) # print(number_of_batches) if binary: file_extentions = '.pkl' else: file_extentions = '.txt' kwargs.update({'batch_size': str(batch_size)}) job_ids = [] for n in range(1, number_of_batches+1): kwargs.update({'job_id': str(n)}) if testrespfile_path is not None: if cv_folds is not None: raise ValueError("""If the response file is specified cv_folds must be equal to None""") else: # specified train/test split batch_processing_dir = processing_dir + 'batch_' + str(n) + '/' batch_job_name = job_name + '_' + str(n) + '.sh' batch_respfile_path = (batch_processing_dir + 'resp_batch_' + str(n) + file_extentions) batch_testrespfile_path = (batch_processing_dir + 'testresp_batch_' + str(n) + file_extentions) batch_job_path = batch_processing_dir + batch_job_name if cluster_spec == 'torque': # update the response file kwargs.update({'testrespfile_path': batch_testrespfile_path}) bashwrap_nm(batch_processing_dir, python_path, normative_path, batch_job_name, covfile_path, batch_respfile_path, func=func, **kwargs) job_id = qsub_nm(job_path=batch_job_path, log_path=log_path, memory=memory, duration=duration) job_ids.append(job_id) elif cluster_spec == 'sbatch': # update the response file kwargs.update({'testrespfile_path': batch_testrespfile_path}) sbatchwrap_nm(batch_processing_dir, python_path, normative_path, batch_job_name, covfile_path, batch_respfile_path, func=func, memory=memory, duration=duration, **kwargs) sbatch_nm(job_path=batch_job_path, log_path=log_path) elif cluster_spec == 'new': # this part requires addition in different envioronment [ sbatchwrap_nm(processing_dir=batch_processing_dir, func=func, **kwargs) sbatch_nm(processing_dir=batch_processing_dir) # ] if testrespfile_path is None: if testcovfile_path is not None: # forward model batch_processing_dir = processing_dir + 'batch_' + str(n) + '/' batch_job_name = job_name + '_' + str(n) + '.sh' batch_respfile_path = (batch_processing_dir + 'resp_batch_' + str(n) + file_extentions) batch_job_path = batch_processing_dir + batch_job_name if cluster_spec == 'torque': bashwrap_nm(batch_processing_dir, python_path, normative_path, batch_job_name, covfile_path, batch_respfile_path, func=func, **kwargs) job_id = qsub_nm(job_path=batch_job_path, log_path=log_path, memory=memory, duration=duration) job_ids.append(job_id) elif cluster_spec == 'sbatch': sbatchwrap_nm(batch_processing_dir, python_path, normative_path, batch_job_name, covfile_path, batch_respfile_path, func=func, memory=memory, duration=duration, **kwargs) sbatch_nm(job_path=batch_job_path, log_path=log_path) elif cluster_spec == 'new': # this part requires addition in different envioronment [ bashwrap_nm(processing_dir=batch_processing_dir, func=func, **kwargs) qsub_nm(processing_dir=batch_processing_dir) # ] else: # cross-validation batch_processing_dir = (processing_dir + 'batch_' + str(n) + '/') batch_job_name = job_name + '_' + str(n) + '.sh' batch_respfile_path = (batch_processing_dir + 'resp_batch_' + str(n) + file_extentions) batch_job_path = batch_processing_dir + batch_job_name if cluster_spec == 'torque': bashwrap_nm(batch_processing_dir, python_path, normative_path, batch_job_name, covfile_path, batch_respfile_path, func=func, **kwargs) job_id = qsub_nm(job_path=batch_job_path, log_path=log_path, memory=memory, duration=duration) job_ids.append(job_id) elif cluster_spec == 'sbatch': sbatchwrap_nm(batch_processing_dir, python_path, normative_path, batch_job_name, covfile_path, batch_respfile_path, func=func, memory=memory, duration=duration, **kwargs) sbatch_nm(job_path=batch_job_path, log_path=log_path) elif cluster_spec == 'new': # this part requires addition in different envioronment [ bashwrap_nm(processing_dir=batch_processing_dir, func=func, **kwargs) qsub_nm(processing_dir=batch_processing_dir) # ] if interactive: check_jobs(job_ids, delay=60) success = False while (not success): success = collect_nm(processing_dir, job_name, func=func, collect=False, binary=binary, batch_size=batch_size, outputsuffix=outputsuffix) if success: break else: if interactive == 'query': response = yes_or_no('Rerun the failed jobs?') if response: rerun_nm(processing_dir, log_path=log_path, memory=memory, duration=duration, binary=binary, interactive=interactive) else: success = True else: print('Reruning the failed jobs ...') rerun_nm(processing_dir, log_path=log_path, memory=memory, duration=duration, binary=binary, interactive=interactive) if interactive == 'query': response = yes_or_no('Collect the results?') if response: success = collect_nm(processing_dir, job_name, func=func, collect=True, binary=binary, batch_size=batch_size, outputsuffix=outputsuffix) else: print('Collecting the results ...') success = collect_nm(processing_dir, job_name, func=func, collect=True, binary=binary, batch_size=batch_size, outputsuffix=outputsuffix)
"""routines that are environment independent"""
[docs] def split_nm(processing_dir, respfile_path, batch_size, binary, **kwargs): ''' This function prepares the input files for normative_parallel. Basic usage:: split_nm(processing_dir, respfile_path, batch_size, binary, testrespfile_path) :param processing_dir: Full path to the processing dir :param respfile_path: Full path to the responsefile.txt (subjects x features) :param batch_size: Number of features in each batch :param testrespfile_path: Full path to the test responsefile.txt (subjects x features) :param binary: If True binary file :outputs: The creation of a folder struture for batch-wise processing. witten by (primarily) T Wolfers (adapted) SM Kia, (adapted) S Rutherford. ''' testrespfile_path = kwargs.pop('testrespfile_path', None) dummy, respfile_extension = os.path.splitext(respfile_path) if (binary and respfile_extension != '.pkl'): raise ValueError("""If binary is True the file format for the testrespfile file must be .pkl""") elif (binary == False and respfile_extension != '.txt'): raise ValueError("""If binary is False the file format for the testrespfile file must be .txt""") # splits response into batches if testrespfile_path is None: if (binary == False): respfile = fileio.load_ascii(respfile_path) else: respfile = pd.read_pickle(respfile_path) respfile = pd.DataFrame(respfile) numsub = respfile.shape[1] batch_vec = np.arange(0, numsub, batch_size) batch_vec = np.append(batch_vec, numsub) for n in range(0, (len(batch_vec) - 1)): resp_batch = respfile.iloc[:, (batch_vec[n]): batch_vec[n + 1]] os.chdir(processing_dir) resp = str('resp_batch_' + str(n+1)) batch = str('batch_' + str(n+1)) if not os.path.exists(processing_dir + batch): os.makedirs(processing_dir + batch) os.makedirs(processing_dir + batch + '/Models/') if (binary == False): fileio.save_pd(resp_batch, processing_dir + batch + '/' + resp + '.txt') else: resp_batch.to_pickle(processing_dir + batch + '/' + resp + '.pkl', protocol=PICKLE_PROTOCOL) # splits response and test responsefile into batches else: dummy, testrespfile_extension = os.path.splitext(testrespfile_path) if (binary and testrespfile_extension != '.pkl'): raise ValueError("""If binary is True the file format for the testrespfile file must be .pkl""") elif (binary == False and testrespfile_extension != '.txt'): raise ValueError("""If binary is False the file format for the testrespfile file must be .txt""") if (binary == False): respfile = fileio.load_ascii(respfile_path) testrespfile = fileio.load_ascii(testrespfile_path) else: respfile = pd.read_pickle(respfile_path) testrespfile = pd.read_pickle(testrespfile_path) respfile = pd.DataFrame(respfile) testrespfile = pd.DataFrame(testrespfile) numsub = respfile.shape[1] batch_vec = np.arange(0, numsub, batch_size) batch_vec = np.append(batch_vec, numsub) for n in range(0, (len(batch_vec) - 1)): resp_batch = respfile.iloc[:, (batch_vec[n]): batch_vec[n + 1]] testresp_batch = testrespfile.iloc[:, (batch_vec[n]): batch_vec[n + 1]] os.chdir(processing_dir) resp = str('resp_batch_' + str(n+1)) testresp = str('testresp_batch_' + str(n+1)) batch = str('batch_' + str(n+1)) if not os.path.exists(processing_dir + batch): os.makedirs(processing_dir + batch) os.makedirs(processing_dir + batch + '/Models/') if (binary == False): fileio.save_pd(resp_batch, processing_dir + batch + '/' + resp + '.txt') fileio.save_pd(testresp_batch, processing_dir + batch + '/' + testresp + '.txt') else: resp_batch.to_pickle(processing_dir + batch + '/' + resp + '.pkl', protocol=PICKLE_PROTOCOL) testresp_batch.to_pickle(processing_dir + batch + '/' + testresp + '.pkl', protocol=PICKLE_PROTOCOL)
[docs] def collect_nm(processing_dir, job_name, func='estimate', collect=False, binary=False, batch_size=None, outputsuffix='_estimate'): '''Function to checks and collects all batches. Basic usage:: collect_nm(processing_dir, job_name) :param processing_dir: Full path to the processing directory :param collect: If True data is checked for failed batches and collected; if False data is just checked :param binary: Results in pkl format :outputs: Text or pkl files containing all results accross all batches the combined output (written to disk). :returns 0: if batches fail :returns 1: if bathches complete successfully written by (primarily) T Wolfers, (adapted) SM Kia, (adapted) S Rutherford. ''' if binary: file_extentions = '.pkl' else: file_extentions = '.txt' # detect number of subjects, batches, hyperparameters and CV batches = glob.glob(processing_dir + 'batch_*/') count = 0 batch_fail = [] if (func != 'fit' and func != 'extend' and func != 'merge' and func != 'tune'): file_example = [] # TODO: Collect_nm only depends on yhat, thus does not work when no # prediction is made (when test cov is not specified). for batch in batches: if file_example == []: file_example = glob.glob(batch + 'yhat' + outputsuffix + file_extentions) else: break if binary is False: file_example = fileio.load(file_example[0]) else: file_example = pd.read_pickle(file_example[0]) numsubjects = file_example.shape[0] try: # doesn't exist if size=1, and txt file batch_size = file_example.shape[1] except: batch_size = 1 # artificially creates files for batches that were not executed batch_dirs = glob.glob(processing_dir + 'batch_*/') batch_dirs = fileio.sort_nicely(batch_dirs) for batch in batch_dirs: filepath = glob.glob(batch + 'yhat' + outputsuffix + '*') if filepath == []: count = count+1 batch1 = glob.glob(batch + '/' + job_name + '*.sh') print(batch1) batch_fail.append(batch1) if collect is True: pRho = np.ones(batch_size) pRho = pRho.transpose() pRho = pd.Series(pRho) fileio.save(pRho, batch + 'pRho' + outputsuffix + file_extentions) Rho = np.zeros(batch_size) Rho = Rho.transpose() Rho = pd.Series(Rho) fileio.save(Rho, batch + 'Rho' + outputsuffix + file_extentions) rmse = np.zeros(batch_size) rmse = rmse.transpose() rmse = pd.Series(rmse) fileio.save(rmse, batch + 'RMSE' + outputsuffix + file_extentions) smse = np.zeros(batch_size) smse = smse.transpose() smse = pd.Series(smse) fileio.save(smse, batch + 'SMSE' + outputsuffix + file_extentions) expv = np.zeros(batch_size) expv = expv.transpose() expv = pd.Series(expv) fileio.save(expv, batch + 'EXPV' + outputsuffix + file_extentions) msll = np.zeros(batch_size) msll = msll.transpose() msll = pd.Series(msll) fileio.save(msll, batch + 'MSLL' + outputsuffix + file_extentions) yhat = np.zeros([numsubjects, batch_size]) yhat = pd.DataFrame(yhat) fileio.save(yhat, batch + 'yhat' + outputsuffix + file_extentions) ys2 = np.zeros([numsubjects, batch_size]) ys2 = pd.DataFrame(ys2) fileio.save(ys2, batch + 'ys2' + outputsuffix + file_extentions) Z = np.zeros([numsubjects, batch_size]) Z = pd.DataFrame(Z) fileio.save(Z, batch + 'Z' + outputsuffix + file_extentions) nll = np.zeros(batch_size) nll = nll.transpose() nll = pd.Series(nll) fileio.save(nll, batch + 'NLL' + outputsuffix + file_extentions) bic = np.zeros(batch_size) bic = bic.transpose() bic = pd.Series(bic) fileio.save(bic, batch + 'BIC' + outputsuffix + file_extentions) if not os.path.isdir(batch + 'Models'): os.mkdir('Models') else: # if more than 10% of yhat is nan then it is a failed batch yhat = fileio.load(filepath[0]) if np.count_nonzero(~np.isnan(yhat))/(np.prod(yhat.shape)) < 0.9: count = count+1 batch1 = glob.glob(batch + '/' + job_name + '*.sh') print('More than 10% nans in ' + batch1[0]) batch_fail.append(batch1) else: batch_dirs = glob.glob(processing_dir + 'batch_*/') batch_dirs = fileio.sort_nicely(batch_dirs) for batch in batch_dirs: filepath = glob.glob(batch + 'Models/' + 'NM_' + '*' + outputsuffix + '*') if len(filepath) < batch_size: count = count+1 batch1 = glob.glob(batch + '/' + job_name + '*.sh') print(batch1) batch_fail.append(batch1) # combines all output files across batches if collect is True: pRho_filenames = glob.glob(processing_dir + 'batch_*/' + 'pRho' + outputsuffix + '*') if pRho_filenames: pRho_filenames = fileio.sort_nicely(pRho_filenames) pRho_dfs = [] for pRho_filename in pRho_filenames: if batch_size == 1 and binary is False: # if batch size = 1 and .txt file # from 0d (scalar) to 1d-array pRho_dfs.append(pd.DataFrame( fileio.load(pRho_filename)[np.newaxis,])) else: pRho_dfs.append(pd.DataFrame(fileio.load(pRho_filename))) pRho_dfs = pd.concat(pRho_dfs, ignore_index=True, axis=0) fileio.save(pRho_dfs, processing_dir + 'pRho' + outputsuffix + file_extentions) del pRho_dfs Rho_filenames = glob.glob(processing_dir + 'batch_*/' + 'Rho' + outputsuffix + '*') if Rho_filenames: Rho_filenames = fileio.sort_nicely(Rho_filenames) Rho_dfs = [] for Rho_filename in Rho_filenames: if batch_size == 1 and binary is False: # if batch size = 1 and .txt file # from 0d (scalar) to 1d-array Rho_dfs.append(pd.DataFrame( fileio.load(Rho_filename)[np.newaxis,])) else: Rho_dfs.append(pd.DataFrame(fileio.load(Rho_filename))) Rho_dfs = pd.concat(Rho_dfs, ignore_index=True, axis=0) fileio.save(Rho_dfs, processing_dir + 'Rho' + outputsuffix + file_extentions) del Rho_dfs Z_filenames = glob.glob(processing_dir + 'batch_*/' + 'Z' + outputsuffix + '*') if Z_filenames: Z_filenames = fileio.sort_nicely(Z_filenames) Z_dfs = [] for Z_filename in Z_filenames: Z_dfs.append(pd.DataFrame(fileio.load(Z_filename))) Z_dfs = pd.concat(Z_dfs, ignore_index=True, axis=1) fileio.save(Z_dfs, processing_dir + 'Z' + outputsuffix + file_extentions) del Z_dfs yhat_filenames = glob.glob(processing_dir + 'batch_*/' + 'yhat' + outputsuffix + '*') if yhat_filenames: yhat_filenames = fileio.sort_nicely(yhat_filenames) yhat_dfs = [] for yhat_filename in yhat_filenames: yhat_dfs.append(pd.DataFrame(fileio.load(yhat_filename))) yhat_dfs = pd.concat(yhat_dfs, ignore_index=True, axis=1) fileio.save(yhat_dfs, processing_dir + 'yhat' + outputsuffix + file_extentions) del yhat_dfs ys2_filenames = glob.glob(processing_dir + 'batch_*/' + 'ys2' + outputsuffix + '*') if ys2_filenames: ys2_filenames = fileio.sort_nicely(ys2_filenames) ys2_dfs = [] for ys2_filename in ys2_filenames: ys2_dfs.append(pd.DataFrame(fileio.load(ys2_filename))) ys2_dfs = pd.concat(ys2_dfs, ignore_index=True, axis=1) fileio.save(ys2_dfs, processing_dir + 'ys2' + outputsuffix + file_extentions) del ys2_dfs rmse_filenames = glob.glob(processing_dir + 'batch_*/' + 'RMSE' + outputsuffix + '*') if rmse_filenames: rmse_filenames = fileio.sort_nicely(rmse_filenames) rmse_dfs = [] for rmse_filename in rmse_filenames: if batch_size == 1 and binary is False: # if batch size = 1 and .txt file # from 0d (scalar) to 1d-array rmse_dfs.append(pd.DataFrame( fileio.load(rmse_filename)[np.newaxis,])) else: rmse_dfs.append(pd.DataFrame(fileio.load(rmse_filename))) rmse_dfs = pd.concat(rmse_dfs, ignore_index=True, axis=0) fileio.save(rmse_dfs, processing_dir + 'RMSE' + outputsuffix + file_extentions) del rmse_dfs smse_filenames = glob.glob(processing_dir + 'batch_*/' + 'SMSE' + outputsuffix + '*') if smse_filenames: smse_filenames = fileio.sort_nicely(smse_filenames) smse_dfs = [] for smse_filename in smse_filenames: if batch_size == 1 and binary is False: # if batch size = 1 and .txt file # from 0d (scalar) to 1d-array smse_dfs.append(pd.DataFrame( fileio.load(smse_filename)[np.newaxis,])) else: smse_dfs.append(pd.DataFrame(fileio.load(smse_filename))) smse_dfs = pd.concat(smse_dfs, ignore_index=True, axis=0) fileio.save(smse_dfs, processing_dir + 'SMSE' + outputsuffix + file_extentions) del smse_dfs expv_filenames = glob.glob(processing_dir + 'batch_*/' + 'EXPV' + outputsuffix + '*') if expv_filenames: expv_filenames = fileio.sort_nicely(expv_filenames) expv_dfs = [] for expv_filename in expv_filenames: if batch_size == 1 and binary is False: # if batch size = 1 and .txt file # from 0d (scalar) to 1d-array expv_dfs.append(pd.DataFrame( fileio.load(expv_filename)[np.newaxis,])) else: expv_dfs.append(pd.DataFrame(fileio.load(expv_filename))) expv_dfs = pd.concat(expv_dfs, ignore_index=True, axis=0) fileio.save(expv_dfs, processing_dir + 'EXPV' + outputsuffix + file_extentions) del expv_dfs msll_filenames = glob.glob(processing_dir + 'batch_*/' + 'MSLL' + outputsuffix + '*') if msll_filenames: msll_filenames = fileio.sort_nicely(msll_filenames) msll_dfs = [] for msll_filename in msll_filenames: if batch_size == 1 and binary is False: # if batch size = 1 and .txt file # from 0d (scalar) to 1d-array msll_dfs.append(pd.DataFrame( fileio.load(msll_filename)[np.newaxis,])) else: msll_dfs.append(pd.DataFrame(fileio.load(msll_filename))) msll_dfs = pd.concat(msll_dfs, ignore_index=True, axis=0) fileio.save(msll_dfs, processing_dir + 'MSLL' + outputsuffix + file_extentions) del msll_dfs nll_filenames = glob.glob(processing_dir + 'batch_*/' + 'NLL' + outputsuffix + '*') if nll_filenames: nll_filenames = fileio.sort_nicely(nll_filenames) nll_dfs = [] for nll_filename in nll_filenames: if batch_size == 1 and binary is False: # if batch size = 1 and .txt file # from 0d (scalar) to 1d-array nll_dfs.append(pd.DataFrame( fileio.load(nll_filename)[np.newaxis,])) else: nll_dfs.append(pd.DataFrame(fileio.load(nll_filename))) nll_dfs = pd.concat(nll_dfs, ignore_index=True, axis=0) fileio.save(nll_dfs, processing_dir + 'NLL' + outputsuffix + file_extentions) del nll_dfs bic_filenames = glob.glob(processing_dir + 'batch_*/' + 'BIC' + outputsuffix + '*') if bic_filenames: bic_filenames = fileio.sort_nicely(bic_filenames) bic_dfs = [] for bic_filename in bic_filenames: if batch_size == 1 and binary is False: # if batch size = 1 and .txt file # from 0d (scalar) to 1d-array bic_dfs.append(pd.DataFrame( fileio.load(bic_filename)[np.newaxis,])) else: bic_dfs.append(pd.DataFrame(fileio.load(bic_filename))) bic_dfs = pd.concat(bic_dfs, ignore_index=True, axis=0) fileio.save(bic_dfs, processing_dir + 'BIC' + outputsuffix + file_extentions) del bic_dfs if (func != 'predict' and func != 'extend' and func != 'merge' and func != 'tune'): if not os.path.isdir(processing_dir + 'Models') and \ os.path.exists(os.path.join(batches[0], 'Models')): os.mkdir(processing_dir + 'Models') meta_filenames = glob.glob(processing_dir + 'batch_*/Models/' + 'meta_data.md') mY = [] sY = [] X_scalers = [] Y_scalers = [] if meta_filenames: meta_filenames = fileio.sort_nicely(meta_filenames) with open(meta_filenames[0], 'rb') as file: meta_data = pickle.load(file) for meta_filename in meta_filenames: with open(meta_filename, 'rb') as file: meta_data = pickle.load(file) mY.append(meta_data['mean_resp']) sY.append(meta_data['std_resp']) if meta_data['inscaler'] in ['standardize', 'minmax', 'robminmax']: X_scalers.append(meta_data['scaler_cov']) if meta_data['outscaler'] in ['standardize', 'minmax', 'robminmax']: Y_scalers.append(meta_data['scaler_resp']) meta_data['mean_resp'] = np.squeeze(np.column_stack(mY)) meta_data['std_resp'] = np.squeeze(np.column_stack(sY)) meta_data['scaler_cov'] = X_scalers meta_data['scaler_resp'] = Y_scalers with open(os.path.join(processing_dir, 'Models', 'meta_data.md'), 'wb') as file: pickle.dump(meta_data, file, protocol=PICKLE_PROTOCOL) batch_dirs = glob.glob(processing_dir + 'batch_*/') if batch_dirs: batch_dirs = fileio.sort_nicely(batch_dirs) for b, batch_dir in enumerate(batch_dirs): src_files = glob.glob(batch_dir + 'Models/NM*' + outputsuffix + '.pkl') if src_files: src_files = fileio.sort_nicely(src_files) for f, full_file_name in enumerate(src_files): if os.path.isfile(full_file_name): file_name = full_file_name.split('/')[-1] n = file_name.split('_') n[-2] = str(b * batch_size + f) n = '_'.join(n) shutil.copy(full_file_name, processing_dir + 'Models/' + n) elif func == 'fit': count = count+1 batch1 = glob.glob(batch_dir + '/' + job_name + '*.sh') print('Failed batch: ' + batch1[0]) batch_fail.append(batch1) # list batches that were not executed print('Number of batches that failed:' + str(count)) batch_fail_df = pd.DataFrame(batch_fail) if file_extentions == '.txt': fileio.save_pd(batch_fail_df, processing_dir + 'failed_batches' + file_extentions) else: fileio.save(batch_fail_df, processing_dir + 'failed_batches' + file_extentions) if not batch_fail: return True else: return False
[docs] def delete_nm(processing_dir, binary=False): '''This function deletes all processing for normative modelling and just keeps the combined output. Basic usage:: collect_nm(processing_dir) :param processing_dir: Full path to the processing directory. :param binary: Results in pkl format. written by (primarily) T Wolfers, (adapted) SM Kia, (adapted) S Rutherford. ''' if binary: file_extentions = '.pkl' else: file_extentions = '.txt' for file in glob.glob(processing_dir + 'batch_*/'): shutil.rmtree(file) if os.path.exists(processing_dir + 'failed_batches' + file_extentions): os.remove(processing_dir + 'failed_batches' + file_extentions)
# all routines below are envronment dependent and require adaptation in novel # environments -> copy those routines and adapt them in accrodance with your # environment
[docs] def bashwrap_nm(processing_dir, python_path, normative_path, job_name, covfile_path, respfile_path, func='estimate', **kwargs): ''' This function wraps normative modelling into a bash script to run it on a torque cluster system. Basic usage:: bashwrap_nm(processing_dir, python_path, normative_path, job_name, covfile_path, respfile_path) :param processing_dir: Full path to the processing dir :param python_path: Full path to the python distribution :param normative_path: Full path to the normative.py :param job_name: Name for the bash script that is the output of this function :param covfile_path: Full path to a .txt file that contains all covariates (subjects x covariates) for the responsefile :param respfile_path: Full path to a .txt that contains all features (subjects x features) :param cv_folds: Number of cross validations :param testcovfile_path: Full path to a .txt file that contains all covariates (subjects x covariates) for the testresponse file :param testrespfile_path: Full path to a .txt file that contains all test features :param alg: which algorithm to use :param configparam: configuration parameters for this algorithm :outputs: A bash.sh file containing the commands for normative modelling saved to the processing directory (written to disk). written by (primarily) T Wolfers, (adapted) S Rutherford. ''' # here we use pop not get to remove the arguments as they used cv_folds = kwargs.pop('cv_folds', None) testcovfile_path = kwargs.pop('testcovfile_path', None) testrespfile_path = kwargs.pop('testrespfile_path', None) alg = kwargs.pop('alg', None) configparam = kwargs.pop('configparam', None) # change to processing dir os.chdir(processing_dir) output_changedir = ['cd ' + processing_dir + '\n'] bash_lines = '#!/bin/bash\n' bash_cores = 'export OMP_NUM_THREADS=1\n' bash_environment = [bash_lines + bash_cores] # creates call of function for normative modelling if (testrespfile_path is not None) and (testcovfile_path is not None): job_call = [python_path + ' ' + normative_path + ' -c ' + covfile_path + ' -t ' + testcovfile_path + ' -r ' + testrespfile_path + ' -f ' + func] elif (testrespfile_path is None) and (testcovfile_path is not None): job_call = [python_path + ' ' + normative_path + ' -c ' + covfile_path + ' -t ' + testcovfile_path + ' -f ' + func] elif cv_folds is not None: job_call = [python_path + ' ' + normative_path + ' -c ' + covfile_path + ' -k ' + str(cv_folds) + ' -f ' + func] elif func != 'estimate': job_call = [python_path + ' ' + normative_path + ' -c ' + covfile_path + ' -f ' + func] else: raise ValueError("""For 'estimate' function either testcov or cvfold must be specified.""") # add algorithm-specific parameters if alg is not None: job_call = [job_call[0] + ' -a ' + alg] if configparam is not None: job_call = [job_call[0] + ' -x ' + str(configparam)] # add standardization flag if it is false # if not standardize: # job_call = [job_call[0] + ' -s'] # add responses file job_call = [job_call[0] + ' ' + respfile_path] # add in optional arguments. for k in kwargs: job_call = [job_call[0] + ' ' + k + '=' + str(kwargs[k])] # writes bash file into processing dir with open(processing_dir+job_name, 'w') as bash_file: bash_file.writelines(bash_environment + output_changedir + job_call + ["\n"]) # changes permissoins for bash.sh file os.chmod(processing_dir + job_name, 0o770)
[docs] def qsub_nm(job_path, log_path, memory, duration): '''This function submits a job.sh scipt to the torque custer using the qsub command. Basic usage:: qsub_nm(job_path, log_path, memory, duration) :param job_path: Full path to the job.sh file. :param memory: Memory requirements written as string for example 4gb or 500mb. :param duation: The approximate duration of the job, a string with HH:MM:SS for example 01:01:01. :outputs: Submission of the job to the (torque) cluster. written by (primarily) T Wolfers, (adapted) SM Kia, (adapted) S Rutherford. ''' # created qsub command if log_path is None: qsub_call = ['echo ' + job_path + ' | qsub -N ' + job_path + ' -l ' + 'procs=1' + ',mem=' + memory + ',walltime=' + duration] else: qsub_call = ['echo ' + job_path + ' | qsub -N ' + job_path + ' -l ' + 'procs=1' + ',mem=' + memory + ',walltime=' + duration + ' -o ' + log_path + ' -e ' + log_path] # submits job to cluster # call(qsub_call, shell=True) job_id = check_output(qsub_call, shell=True).decode( sys.stdout.encoding).replace("\n", "") return job_id
[docs] def rerun_nm(processing_dir, log_path, memory, duration, binary=False, interactive=False): '''This function reruns all failed batched in processing_dir after collect_nm has identified the failed batches. Basic usage:: rerun_nm(processing_dir, log_path, memory, duration) :param processing_dir: Full path to the processing directory :param memory: Memory requirements written as string for example 4gb or 500mb. :param duration: The approximate duration of the job, a string with HH:MM:SS for example 01:01:01. written by (primarily) T Wolfers, (adapted) SM Kia, (adapted) S Rutherford. ''' job_ids = [] if binary: file_extentions = '.pkl' failed_batches = fileio.load(processing_dir + 'failed_batches' + file_extentions) shape = failed_batches.shape for n in range(0, shape[0]): jobpath = failed_batches[n, 0] print(jobpath) job_id = qsub_nm(job_path=jobpath, log_path=log_path, memory=memory, duration=duration) job_ids.append(job_id) else: file_extentions = '.txt' failed_batches = fileio.load_pd(processing_dir + 'failed_batches' + file_extentions) shape = failed_batches.shape for n in range(0, shape[0]): jobpath = failed_batches.iloc[n, 0] print(jobpath) job_id = qsub_nm(job_path=jobpath, log_path=log_path, memory=memory, duration=duration) job_ids.append(job_id) if interactive: check_jobs(job_ids, delay=60)
# COPY the rotines above here and aadapt those to your cluster # bashwarp_nm; qsub_nm; rerun_nm
[docs] def sbatchwrap_nm(processing_dir, python_path, normative_path, job_name, covfile_path, respfile_path, memory, duration, func='estimate', **kwargs): '''This function wraps normative modelling into a bash script to run it on a torque cluster system. Basic usage:: sbatchwrap_nm(processing_dir, python_path, normative_path, job_name, covfile_path, respfile_path, memory, duration) :param processing_dir: Full path to the processing dir :param python_path: Full path to the python distribution :param normative_path: Full path to the normative.py :param job_name: Name for the bash script that is the output of this function :param covfile_path: Full path to a .txt file that contains all covariates (subjects x covariates) for the responsefile :param respfile_path: Full path to a .txt that contains all features (subjects x features) :param cv_folds: Number of cross validations :param testcovfile_path: Full path to a .txt file that contains all covariates (subjects x covariates) for the testresponse file :param testrespfile_path: Full path to a .txt file that contains all test features :param alg: which algorithm to use :param configparam: configuration parameters for this algorithm :outputs: A bash.sh file containing the commands for normative modelling saved to the processing directory (written to disk). written by (primarily) T Wolfers, (adapted) S Rutherford ''' # here we use pop not get to remove the arguments as they used cv_folds = kwargs.pop('cv_folds', None) testcovfile_path = kwargs.pop('testcovfile_path', None) testrespfile_path = kwargs.pop('testrespfile_path', None) alg = kwargs.pop('alg', None) configparam = kwargs.pop('configparam', None) # change to processing dir os.chdir(processing_dir) output_changedir = ['cd ' + processing_dir + '\n'] sbatch_init = '#!/bin/bash\n' sbatch_jobname = '#SBATCH --job-name=' + processing_dir + '\n' sbatch_account = '#SBATCH --account=p33_norment\n' sbatch_nodes = '#SBATCH --nodes=1\n' sbatch_tasks = '#SBATCH --ntasks=1\n' sbatch_time = '#SBATCH --time=' + str(duration) + '\n' sbatch_memory = '#SBATCH --mem-per-cpu=' + str(memory) + '\n' sbatch_module = 'module purge\n' sbatch_anaconda = 'module load anaconda3\n' sbatch_exit = 'set -o errexit\n' # echo -n "This script is running on " # hostname bash_environment = [sbatch_init + sbatch_jobname + sbatch_account + sbatch_nodes + sbatch_tasks + sbatch_time + sbatch_module + sbatch_anaconda] # creates call of function for normative modelling if (testrespfile_path is not None) and (testcovfile_path is not None): job_call = [python_path + ' ' + normative_path + ' -c ' + covfile_path + ' -t ' + testcovfile_path + ' -r ' + testrespfile_path + ' -f ' + func] elif (testrespfile_path is None) and (testcovfile_path is not None): job_call = [python_path + ' ' + normative_path + ' -c ' + covfile_path + ' -t ' + testcovfile_path + ' -f ' + func] elif cv_folds is not None: job_call = [python_path + ' ' + normative_path + ' -c ' + covfile_path + ' -k ' + str(cv_folds) + ' -f ' + func] elif func != 'estimate': job_call = [python_path + ' ' + normative_path + ' -c ' + covfile_path + ' -f ' + func] else: raise ValueError("""For 'estimate' function either testcov or cvfold must be specified.""") # add algorithm-specific parameters if alg is not None: job_call = [job_call[0] + ' -a ' + alg] if configparam is not None: job_call = [job_call[0] + ' -x ' + str(configparam)] # add standardization flag if it is false # if not standardize: # job_call = [job_call[0] + ' -s'] # add responses file job_call = [job_call[0] + ' ' + respfile_path] # add in optional arguments. for k in kwargs: job_call = [job_call[0] + ' ' + k + '=' + kwargs[k]] # writes bash file into processing dir with open(processing_dir+job_name, 'w') as bash_file: bash_file.writelines(bash_environment + output_changedir + job_call + ["\n"] + [sbatch_exit]) # changes permissoins for bash.sh file os.chmod(processing_dir + job_name, 0o770)
[docs] def sbatch_nm(job_path, log_path): '''This function submits a job.sh scipt to the torque custer using the qsub command. Basic usage:: sbatch_nm(job_path, log_path) :param job_path: Full path to the job.sh file :param log_path: The logs are currently stored in the working dir :outputs: Submission of the job to the (torque) cluster. written by (primarily) T Wolfers, (adapted) S Rutherford. ''' # created qsub command sbatch_call = ['sbatch ' + job_path] # submits job to cluster call(sbatch_call, shell=True)
[docs] def sbatchrerun_nm(processing_dir, memory, duration, new_memory=False, new_duration=False, binary=False, **kwargs): '''This function reruns all failed batched in processing_dir after collect_nm has identified he failed batches. Basic usage:: rerun_nm(processing_dir, memory, duration) :param processing_dir: Full path to the processing directory. :param memory: Memory requirements written as string, for example 4gb or 500mb. :param duration: The approximate duration of the job, a string with HH:MM:SS for example 01:01:01. :param new_memory: If you want to change the memory you have to indicate it here. :param new_duration: If you want to change the duration you have to indicate it here. :outputs: Re-runs failed batches. written by (primarily) T Wolfers, (adapted) S Rutherford. ''' log_path = kwargs.pop('log_path', None) if binary: file_extentions = '.pkl' failed_batches = fileio.load( processing_dir + 'failed_batches' + file_extentions) shape = failed_batches.shape for n in range(0, shape[0]): jobpath = failed_batches[n, 0] print(jobpath) if new_duration != False: with fileinput.FileInput(jobpath, inplace=True) as file: for line in file: print(line.replace(duration, new_duration), end='') if new_memory != False: with fileinput.FileInput(jobpath, inplace=True) as file: for line in file: print(line.replace(memory, new_memory), end='') sbatch_nm(jobpath, log_path) else: file_extentions = '.txt' failed_batches = fileio.load_pd( processing_dir + 'failed_batches' + file_extentions) shape = failed_batches.shape for n in range(0, shape[0]): jobpath = failed_batches.iloc[n, 0] print(jobpath) if new_duration != False: with fileinput.FileInput(jobpath, inplace=True) as file: for line in file: print(line.replace(duration, new_duration), end='') if new_memory != False: with fileinput.FileInput(jobpath, inplace=True) as file: for line in file: print(line.replace(memory, new_memory), end='') sbatch_nm(jobpath, log_path)
[docs] def retrieve_jobs(): """ A utility function to retrieve task status from the outputs of qstat. :return: a dictionary of jobs. """ output = check_output('qstat', shell=True).decode(sys.stdout.encoding) output = output.split('\n') jobs = dict() for line in output[2:-1]: (Job_ID, Job_Name, User, Wall_Time, Status, Queue) = line.split() jobs[Job_ID] = dict() jobs[Job_ID]['name'] = Job_Name jobs[Job_ID]['walltime'] = Wall_Time jobs[Job_ID]['status'] = Status return jobs
[docs] def check_job_status(jobs): """ A utility function to count the tasks with different status. :param jobs: List of job ids. :return: returns the number of taks athat are queued, running, completed etc """ running_jobs = retrieve_jobs() r = 0 c = 0 q = 0 u = 0 for job in jobs: try: if running_jobs[job]['status'] == 'C': c += 1 elif running_jobs[job]['status'] == 'Q': q += 1 elif running_jobs[job]['status'] == 'R': r += 1 else: u += 1 except: # probably meanwhile the job is finished. c += 1 continue print('Total Jobs:%d, Queued:%d, Running:%d, Completed:%d, Unknown:%d' % (len(jobs), q, r, c, u)) return q, r, c, u
[docs] def check_jobs(jobs, delay=60): """ A utility function for chacking the status of submitted jobs. :param jobs: list of job ids. :param delay: the delay (in sec) between two consequative checks, defaults to 60. """ n = len(jobs) while (True): q, r, c, u = check_job_status(jobs) if c == n: print('All jobs are completed!') break time.sleep(delay)