pcntoolkit.util.runner#

Classes#

Runner

Initialize the runner.

Functions#

load_and_execute(args)

Load a callable and data from a pickle file and execute it.

Module Contents#

class Runner(parallelize: bool = False, job_type: Literal['torque', 'slurm'] = 'slurm', n_batches: int | None = None, batch_size: int | None = None, n_cores: int = 1, time_limit: str | int = '00:05:00', memory: str = '5GB', max_retries: int = 3, environment: str | None = None, cross_validate: bool = False, cv_folds: int = 5, preamble: str = 'module load anaconda3', log_dir: str | None = None, temp_dir: str | None = None)#

Initialize the runner.

Parameters:
  • parallelize (bool, optional) – Whether to parallelize the jobs.

  • job_type (Literal[``”torque”, ``"slurm"], optional) – The type of job to use.

  • n_batches (int, optional) – The number of jobs to run in parallel.

  • n_cores (int, optional) – The number of cores to use for each job.

  • time_limit (str | int, optional) – The time limit for each job.

  • memory (str, optional) – The memory to use for each job.

  • max_retries (int, optional) – The maximum number of retries for each job.

  • environment (str, optional) – The environment to use for each job.

  • cross_validate (bool, optional) – Whether to cross-validate the model.

  • cv_folds (int, optional) – The number of folds to use for cross-validation.

  • preamble (str, optional) – The preamble to use for each job.

  • log_dir (str, optional) – The directory to save the logs to.

  • temp_dir (str, optional) – The directory to save the temporary files to.

check_job_status(job_name: str) tuple[bool, bool, str | None]#

Check if a job has failed by looking for success file.

Returns:

(is_running, finished_with_error, error_message) If job is still running, returns (True, False, None) If job finished successfully, returns (False, False, None) If job failed, returns (False, True, error_message)

Return type:

tuple[bool, bool, Optional[str]]

check_jobs_status() tuple[Dict[str, str], Dict[str, str], Dict[str, str]]#

Check all jobs in active_job_ids for errors.

Returns:

A tuple containing: - A dictionary mapping job names to job IDs for running jobs - A dictionary mapping job names to error messages for failed jobs - A dictionary mapping job names to job IDs for finished jobs

Return type:

tuple[Dict[str, str], Dict[str, str], Dict[str, str]]

create_temp_and_log_dir()#
extend(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None#

Extend a normative model on a dataset.

Parameters:
  • model (NormativeModel) – The normative model to extend.

  • data (NormData) – The data to extend the model on.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then load the model into the model object. If false, the function will dispatch the jobs and return.

Returns:

The extended model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormativeModel | None

extend_predict(model: pcntoolkit.normative_model.NormativeModel, fit_data: pcntoolkit.dataio.norm_data.NormData, predict_data: pcntoolkit.dataio.norm_data.NormData | None = None, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None#

Extend a normative model on a dataset and predict on another dataset.

Parameters:
  • model (NormativeModel) – The normative model to extend.

  • fit_data (NormData) – The data to extend the model on.

  • predict_data (Optional[NormData], optional) – The data to predict on. Can be None if cross-validation is used.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – If false, the function will dispatch the jobs and return.

Returns:

The extended model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormativeModel | None

fit(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True) pcntoolkit.normative_model.NormativeModel | None#

Fit a normative model on a dataset.

Parameters:
  • model (NormBase) – The normative model to fit.

  • data (NormData) – The data to fit the model on.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then load the model into the model object. If false, the function will dispatch the jobs and return. In that case, the model will not be loaded into the model object, it will have to be loaded manually using the load function when the jobs are done.

Returns:

The fitted model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormativeModel | None

fit_predict(model: pcntoolkit.normative_model.NormativeModel, fit_data: pcntoolkit.dataio.norm_data.NormData, predict_data: pcntoolkit.dataio.norm_data.NormData | None = None, save_dir: str | None = None, observe: bool = True) pcntoolkit.normative_model.NormativeModel | None#

Fit a normative model on a dataset and predict on another dataset.

Parameters:
  • model (NormativeModel) – The normative model to fit.

  • fit_data (NormData) – The data to fit the model on.

  • predict_data (Optional[NormData], optional) – The data to predict on. Can be None if cross-validation is used.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish, then load the model into the model object If false, the function will dispatch the jobs and return. In that case, the model will not be loaded into the model object, it will have to be loaded manually using the load function when the jobs are done.

Returns:

The fitted and model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormativeModel | None

classmethod from_args(args: dict) Runner#
get_all_job_file_paths(job_name)#
get_data_path(job_name)#
get_extend_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable#
get_extend_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable#
get_fit_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str) Callable#

Returns a callable that fits a model on a chunk of data

get_fit_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str) Callable#

Returns a callable that fits a model on a chunk of data and predicts on another chunk of data

get_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str) Callable#

Loads each fold model and predicts on the corresponding fold of data. Model n is used to predict on fold n.

get_python_callable_path(job_name)#
get_transfer_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable#

Returns a callable that transfers a model on a chunk of data

get_transfer_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable#
load_data(data_source: pcntoolkit.dataio.norm_data.NormData, fold_index: int | None = 0) None#
classmethod load_from_state(runner_file: str) Runner#

Load a runner from a saved state.

Parameters:

runner_file (str) – Path to the runner state file

Returns:

A runner instance with the saved state

Return type:

Runner

load_model(fold_index: int | None = 0, into: pcntoolkit.normative_model.NormativeModel | None = None) pcntoolkit.normative_model.NormativeModel#
predict(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True) pcntoolkit.normative_model.NormativeModel | None#

Predict on a dataset.

Parameters:
  • model (NormativeModel) – The normative model to predict on.

  • data (NormData) – The data to predict on.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish.

Return type:

None. If you want to load the model, use the runner.load_model function.

re_submit_failed_jobs(observe: bool = True) None#
register_fold_indices(save_dir: str, i_fold: int, indices: tuple[int, int])#
save() None#

Save the runner state to a JSON file in the save directory.

save_callable_and_data(job_name: int | str, fn: Callable, chunk: tuple[pcntoolkit.dataio.norm_data.NormData] | tuple[pcntoolkit.dataio.norm_data.NormData, pcntoolkit.dataio.norm_data.NormData | None]) tuple[str, str]#
set_task_id(task_name: str, model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData)#
submit_jobs(fn: Callable, first_data_source: pcntoolkit.dataio.norm_data.NormData, second_data_source: pcntoolkit.dataio.norm_data.NormData | None = None, mode: Literal['unary', 'binary'] = 'unary') None#

Submit jobs to the job scheduler.

The predict_data argument is optional, and if it is not provided, None is passed to the function.

Parameters:
  • fn (Callable) – Function to call. It should take two arguments.

  • fit_data (NormData) – Data to fit the model on

  • predict_data (Optional[NormData], optional) – Data to predict on, by default None

transfer(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None#

Transfer a normative model to a new dataset.

Parameters:
  • model (NormativeModel) – The normative model to transfer.

  • data (NormData) – The data to transfer the model to.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then return the transfered model. If false, the function will dispatch the jobs and return.

Returns:

The transfered model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormBase | None

transfer_predict(model: pcntoolkit.normative_model.NormativeModel, fit_data: pcntoolkit.dataio.norm_data.NormData, predict_data: pcntoolkit.dataio.norm_data.NormData | None = None, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None#

Transfer a normative model to a new dataset and predict on another dataset.

Parameters:
  • model (NormativeModel) – The normative model to transfer.

  • fit_data (NormData) – The data to transfer the model to.

  • predict_data (Optional[NormData], optional) – The data to predict on. Can be None if cross-validation is used.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then load the model into the model object. If false, the function will dispatch the jobs and return.

Returns:

The transfered model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormBase | No   ne

wait_or_finish(observe: bool, into: pcntoolkit.normative_model.NormativeModel | None = None, *data_sources) pcntoolkit.normative_model.NormativeModel | None#
wrap_in_job(job_name, python_callable_path, data_path)#
wrap_in_slurm_job(job_name: int | str, python_callable_path: str, data_path: str) list[str]#
wrap_in_torque_job(job_name: int | str, python_callable_path: str, data_path: str) list[str]#
active_jobs: Dict[str, str]#
batch_size: int | None = 2#
cross_validate: bool = False#
cv_folds: int = 5#
environment: str = None#
failed_jobs: Dict[str, str]#
job_commands: Dict[str, list[str]]#
job_observer = None#
job_type: str = 'local'#
log_dir: str = ''#
max_retries: int = 3#
memory: str = '5gb'#
n_batches: int | None = None#
n_cores: int = 1#
parallelize: bool = True#
preamble: str = 'module load anaconda3'#
save_dir = ''#
task_id = ''#
temp_dir: str = ''#
time_limit_seconds: int = 300#
time_limit_str: str | int = '00:05:00'#
unique_log_dir = ''#
unique_temp_dir = ''#
load_and_execute(args)#

Load a callable and data from a pickle file and execute it.

Parameters:

args (list[str]) – A list of arguments. The first argument is the path to the callable. The second argument is the path to the data. The third argument is the max number of retries.