pcntoolkit.util.runner

Classes

Runner

Initialize the runner.

Functions

load_and_execute(args)

Load a callable and data from a pickle file and execute it.

Module Contents

class Runner(parallelize: bool = False, job_type: Literal['torque', 'slurm'] = 'slurm', n_batches: int | None = None, batch_size: int | None = None, n_cores: int = 1, time_limit: str | int = '00:05:00', memory: str = '5GB', max_retries: int = 3, environment: str | None = None, cross_validate: bool = False, cv_folds: int = 5, preamble: str = 'module load anaconda3', log_dir: str | None = None, temp_dir: str | None = None)

Initialize the runner.

Parameters:
  • parallelize (bool, optional) – Whether to parallelize the jobs.

  • job_type (Literal[``”torque”, ``"slurm"], optional) – The type of job to use.

  • n_batches (int, optional) – The number of jobs to run in parallel.

  • n_cores (int, optional) – The number of cores to use for each job.

  • time_limit (str | int, optional) – The time limit for each job.

  • memory (str, optional) – The memory to use for each job.

  • max_retries (int, optional) – The maximum number of retries for each job.

  • environment (str, optional) – The environment to use for each job.

  • cross_validate (bool, optional) – Whether to cross-validate the model.

  • cv_folds (int, optional) – The number of folds to use for cross-validation.

  • preamble (str, optional) – The preamble to use for each job.

  • log_dir (str, optional) – The directory to save the logs to.

  • temp_dir (str, optional) – The directory to save the temporary files to.

check_job_status(job_name: str) tuple[bool, bool, str | None]

Check if a job has failed by looking for success file.

Returns:

(is_running, finished_with_error, error_message) If job is still running, returns (True, False, None) If job finished successfully, returns (False, False, None) If job failed, returns (False, True, error_message)

Return type:

tuple[bool, bool, Optional[str]]

check_jobs_status() tuple[Dict[str, str], Dict[str, str], Dict[str, str]]

Check all jobs in active_job_ids for errors.

Returns:

A tuple containing: - A dictionary mapping job names to job IDs for running jobs - A dictionary mapping job names to error messages for failed jobs - A dictionary mapping job names to job IDs for finished jobs

Return type:

tuple[Dict[str, str], Dict[str, str], Dict[str, str]]

create_temp_and_log_dir()
extend(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None

Extend a normative model on a dataset.

Parameters:
  • model (NormativeModel) – The normative model to extend.

  • data (NormData) – The data to extend the model on.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then load the model into the model object. If false, the function will dispatch the jobs and return.

Returns:

The extended model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormativeModel | None

extend_predict(model: pcntoolkit.normative_model.NormativeModel, fit_data: pcntoolkit.dataio.norm_data.NormData, predict_data: pcntoolkit.dataio.norm_data.NormData | None = None, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None

Extend a normative model on a dataset and predict on another dataset.

Parameters:
  • model (NormativeModel) – The normative model to extend.

  • fit_data (NormData) – The data to extend the model on.

  • predict_data (Optional[NormData], optional) – The data to predict on. Can be None if cross-validation is used.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – If false, the function will dispatch the jobs and return.

Returns:

The extended model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormativeModel | None

fit(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True) pcntoolkit.normative_model.NormativeModel | None

Fit a normative model on a dataset.

Parameters:
  • model (NormBase) – The normative model to fit.

  • data (NormData) – The data to fit the model on.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then load the model into the model object. If false, the function will dispatch the jobs and return. In that case, the model will not be loaded into the model object, it will have to be loaded manually using the load function when the jobs are done.

Returns:

The fitted model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormativeModel | None

fit_predict(model: pcntoolkit.normative_model.NormativeModel, fit_data: pcntoolkit.dataio.norm_data.NormData, predict_data: pcntoolkit.dataio.norm_data.NormData | None = None, save_dir: str | None = None, observe: bool = True) pcntoolkit.normative_model.NormativeModel | None

Fit a normative model on a dataset and predict on another dataset.

Parameters:
  • model (NormativeModel) – The normative model to fit.

  • fit_data (NormData) – The data to fit the model on.

  • predict_data (Optional[NormData], optional) – The data to predict on. Can be None if cross-validation is used.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish, then load the model into the model object If false, the function will dispatch the jobs and return. In that case, the model will not be loaded into the model object, it will have to be loaded manually using the load function when the jobs are done.

Returns:

The fitted and model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormativeModel | None

classmethod from_args(args: dict) Runner
get_all_job_file_paths(job_name)
get_data_path(job_name)
get_extend_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable
get_extend_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable
get_fit_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str) Callable

Returns a callable that fits a model on a chunk of data

get_fit_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str) Callable

Returns a callable that fits a model on a chunk of data and predicts on another chunk of data

get_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str) Callable

Loads each fold model and predicts on the corresponding fold of data. Model n is used to predict on fold n.

get_python_callable_path(job_name)
get_transfer_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable

Returns a callable that transfers a model on a chunk of data

get_transfer_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable
load_data(data_source: pcntoolkit.dataio.norm_data.NormData, fold_index: int | None = 0) None
classmethod load_from_state(runner_file: str) Runner

Load a runner from a saved state.

Parameters:

runner_file (str) – Path to the runner state file

Returns:

A runner instance with the saved state

Return type:

Runner

load_model(fold_index: int | None = 0, into: pcntoolkit.normative_model.NormativeModel | None = None) pcntoolkit.normative_model.NormativeModel
predict(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True) pcntoolkit.normative_model.NormativeModel | None

Predict on a dataset.

Parameters:
  • model (NormativeModel) – The normative model to predict on.

  • data (NormData) – The data to predict on.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish.

Return type:

None. If you want to load the model, use the runner.load_model function.

re_submit_failed_jobs(observe: bool = True) None
register_fold_indices(save_dir: str, i_fold: int, indices: tuple[int, int])
save() None

Save the runner state to a JSON file in the save directory.

save_callable_and_data(job_name: int | str, fn: Callable, chunk: tuple[pcntoolkit.dataio.norm_data.NormData] | tuple[pcntoolkit.dataio.norm_data.NormData, pcntoolkit.dataio.norm_data.NormData | None]) tuple[str, str]
set_task_id(task_name: str, model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData)
submit_jobs(fn: Callable, first_data_source: pcntoolkit.dataio.norm_data.NormData, second_data_source: pcntoolkit.dataio.norm_data.NormData | None = None, mode: Literal['unary', 'binary'] = 'unary') None

Submit jobs to the job scheduler.

The predict_data argument is optional, and if it is not provided, None is passed to the function.

Parameters:
  • fn (Callable) – Function to call. It should take two arguments.

  • fit_data (NormData) – Data to fit the model on

  • predict_data (Optional[NormData], optional) – Data to predict on, by default None

transfer(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None

Transfer a normative model to a new dataset.

Parameters:
  • model (NormativeModel) – The normative model to transfer.

  • data (NormData) – The data to transfer the model to.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then return the transfered model. If false, the function will dispatch the jobs and return.

Returns:

The transfered model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormBase | None

transfer_predict(model: pcntoolkit.normative_model.NormativeModel, fit_data: pcntoolkit.dataio.norm_data.NormData, predict_data: pcntoolkit.dataio.norm_data.NormData | None = None, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None

Transfer a normative model to a new dataset and predict on another dataset.

Parameters:
  • model (NormativeModel) – The normative model to transfer.

  • fit_data (NormData) – The data to transfer the model to.

  • predict_data (Optional[NormData], optional) – The data to predict on. Can be None if cross-validation is used.

  • save_dir (Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.

  • observe (bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then load the model into the model object. If false, the function will dispatch the jobs and return.

Returns:

The transfered model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.

Return type:

NormBase | No   ne

wait_or_finish(observe: bool, into: pcntoolkit.normative_model.NormativeModel | None = None, *data_sources) pcntoolkit.normative_model.NormativeModel | None
wrap_in_job(job_name, python_callable_path, data_path)
wrap_in_slurm_job(job_name: int | str, python_callable_path: str, data_path: str) list[str]
wrap_in_torque_job(job_name: int | str, python_callable_path: str, data_path: str) list[str]
active_jobs: Dict[str, str]
batch_size: int | None = 2
cross_validate: bool = False
cv_folds: int = 5
environment: str = None
failed_jobs: Dict[str, str]
job_commands: Dict[str, list[str]]
job_observer = None
job_type: str = 'local'
log_dir: str = ''
max_retries: int = 3
memory: str = '5gb'
n_batches: int | None = None
n_cores: int = 1
parallelize: bool = True
preamble: str = 'module load anaconda3'
save_dir = ''
task_id = ''
temp_dir: str = ''
time_limit_seconds: int = 300
time_limit_str: str | int = '00:05:00'
unique_log_dir = ''
unique_temp_dir = ''
load_and_execute(args)

Load a callable and data from a pickle file and execute it.

Parameters:

args (list[str]) – A list of arguments. The first argument is the path to the callable. The second argument is the path to the data. The third argument is the max number of retries.