pcntoolkit.util.runner#
Classes#
Initialize the runner. |
Functions#
|
Load a callable and data from a pickle file and execute it. |
Module Contents#
- class Runner(parallelize: bool = False, job_type: Literal['torque', 'slurm'] = 'slurm', n_batches: int | None = None, batch_size: int | None = None, n_cores: int = 1, time_limit: str | int = '00:05:00', memory: str = '5GB', max_retries: int = 3, environment: str | None = None, cross_validate: bool = False, cv_folds: int = 5, preamble: str = 'module load anaconda3', log_dir: str | None = None, temp_dir: str | None = None)#
Initialize the runner.
- Parameters:
parallelize (
bool, optional) – Whether to parallelize the jobs.job_type (
Literal[``”torque”, ``"slurm"], optional) – The type of job to use.n_batches (
int, optional) – The number of jobs to run in parallel.n_cores (
int, optional) – The number of cores to use for each job.time_limit (
str | int, optional) – The time limit for each job.memory (
str, optional) – The memory to use for each job.max_retries (
int, optional) – The maximum number of retries for each job.environment (
str, optional) – The environment to use for each job.cross_validate (
bool, optional) – Whether to cross-validate the model.cv_folds (
int, optional) – The number of folds to use for cross-validation.preamble (
str, optional) – The preamble to use for each job.log_dir (
str, optional) – The directory to save the logs to.temp_dir (
str, optional) – The directory to save the temporary files to.
- check_job_status(job_name: str) tuple[bool, bool, str | None]#
Check if a job has failed by looking for success file.
- Returns:
(is_running, finished_with_error, error_message) If job is still running, returns (True, False, None) If job finished successfully, returns (False, False, None) If job failed, returns (False, True, error_message)
- Return type:
tuple[bool,bool,Optional[str]]
- check_jobs_status() tuple[Dict[str, str], Dict[str, str], Dict[str, str]]#
Check all jobs in active_job_ids for errors.
- Returns:
A tuple containing: - A dictionary mapping job names to job IDs for running jobs - A dictionary mapping job names to error messages for failed jobs - A dictionary mapping job names to job IDs for finished jobs
- Return type:
tuple[Dict[str,str],Dict[str,str],Dict[str,str]]
- create_temp_and_log_dir()#
- extend(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None#
Extend a normative model on a dataset.
- Parameters:
model (
NormativeModel) – The normative model to extend.data (
NormData) – The data to extend the model on.save_dir (
Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.observe (
bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then load the model into the model object. If false, the function will dispatch the jobs and return.
- Returns:
The extended model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.
- Return type:
NormativeModel | None
- extend_predict(model: pcntoolkit.normative_model.NormativeModel, fit_data: pcntoolkit.dataio.norm_data.NormData, predict_data: pcntoolkit.dataio.norm_data.NormData | None = None, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None#
Extend a normative model on a dataset and predict on another dataset.
- Parameters:
model (
NormativeModel) – The normative model to extend.fit_data (
NormData) – The data to extend the model on.predict_data (
Optional[NormData], optional) – The data to predict on. Can be None if cross-validation is used.save_dir (
Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.observe (
bool, optional) – If false, the function will dispatch the jobs and return.
- Returns:
The extended model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.
- Return type:
NormativeModel | None
- fit(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True) pcntoolkit.normative_model.NormativeModel | None#
Fit a normative model on a dataset.
- Parameters:
model (
NormBase) – The normative model to fit.data (
NormData) – The data to fit the model on.save_dir (
Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.observe (
bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then load the model into the model object. If false, the function will dispatch the jobs and return. In that case, the model will not be loaded into the model object, it will have to be loaded manually using the load function when the jobs are done.
- Returns:
The fitted model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.
- Return type:
NormativeModel | None
- fit_predict(model: pcntoolkit.normative_model.NormativeModel, fit_data: pcntoolkit.dataio.norm_data.NormData, predict_data: pcntoolkit.dataio.norm_data.NormData | None = None, save_dir: str | None = None, observe: bool = True) pcntoolkit.normative_model.NormativeModel | None#
Fit a normative model on a dataset and predict on another dataset.
- Parameters:
model (
NormativeModel) – The normative model to fit.fit_data (
NormData) – The data to fit the model on.predict_data (
Optional[NormData], optional) – The data to predict on. Can be None if cross-validation is used.save_dir (
Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.observe (
bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish, then load the model into the model object If false, the function will dispatch the jobs and return. In that case, the model will not be loaded into the model object, it will have to be loaded manually using the load function when the jobs are done.
- Returns:
The fitted and model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.
- Return type:
NormativeModel | None
- get_all_job_file_paths(job_name)#
- get_data_path(job_name)#
- get_extend_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable#
- get_extend_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable#
- get_fit_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str) Callable#
Returns a callable that fits a model on a chunk of data
- get_fit_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str) Callable#
Returns a callable that fits a model on a chunk of data and predicts on another chunk of data
- get_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str) Callable#
Loads each fold model and predicts on the corresponding fold of data. Model n is used to predict on fold n.
- get_python_callable_path(job_name)#
- get_transfer_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable#
Returns a callable that transfers a model on a chunk of data
- get_transfer_predict_chunk_fn(model: pcntoolkit.normative_model.NormativeModel, save_dir: str, **kwargs) Callable#
- load_data(data_source: pcntoolkit.dataio.norm_data.NormData, fold_index: int | None = 0) None#
- load_model(fold_index: int | None = 0, into: pcntoolkit.normative_model.NormativeModel | None = None) pcntoolkit.normative_model.NormativeModel#
- predict(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True) pcntoolkit.normative_model.NormativeModel | None#
Predict on a dataset.
- Parameters:
model (
NormativeModel) – The normative model to predict on.data (
NormData) – The data to predict on.save_dir (
Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.observe (
bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish.
- Return type:
None. If you wanttoload the model,use the runner.load_model function.
- save_callable_and_data(job_name: int | str, fn: Callable, chunk: tuple[pcntoolkit.dataio.norm_data.NormData] | tuple[pcntoolkit.dataio.norm_data.NormData, pcntoolkit.dataio.norm_data.NormData | None]) tuple[str, str]#
- set_task_id(task_name: str, model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData)#
- submit_jobs(fn: Callable, first_data_source: pcntoolkit.dataio.norm_data.NormData, second_data_source: pcntoolkit.dataio.norm_data.NormData | None = None, mode: Literal['unary', 'binary'] = 'unary') None#
Submit jobs to the job scheduler.
The predict_data argument is optional, and if it is not provided, None is passed to the function.
- Parameters:
fn (
Callable) – Function to call. It should take two arguments.fit_data (
NormData) – Data to fit the model onpredict_data (
Optional[NormData], optional) – Data to predict on, by default None
- transfer(model: pcntoolkit.normative_model.NormativeModel, data: pcntoolkit.dataio.norm_data.NormData, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None#
Transfer a normative model to a new dataset.
- Parameters:
model (
NormativeModel) – The normative model to transfer.data (
NormData) – The data to transfer the model to.save_dir (
Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.observe (
bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then return the transfered model. If false, the function will dispatch the jobs and return.
- Returns:
The transfered model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.
- Return type:
NormBase | None
- transfer_predict(model: pcntoolkit.normative_model.NormativeModel, fit_data: pcntoolkit.dataio.norm_data.NormData, predict_data: pcntoolkit.dataio.norm_data.NormData | None = None, save_dir: str | None = None, observe: bool = True, **kwargs) pcntoolkit.normative_model.NormativeModel | None#
Transfer a normative model to a new dataset and predict on another dataset.
- Parameters:
model (
NormativeModel) – The normative model to transfer.fit_data (
NormData) – The data to transfer the model to.predict_data (
Optional[NormData], optional) – The data to predict on. Can be None if cross-validation is used.save_dir (
Optional[str], optional) – The directory to save the model to. If None, the model will be saved in the model’s save directory.observe (
bool, optional) – Whether to observe the jobs. If true, the function will wait for the jobs to finish and then load the model into the model object. If false, the function will dispatch the jobs and return.
- Returns:
The transfered model. If observe is true, the function will wait for the jobs to finish and return the model object. If observe is false, the function will return None.
- Return type:
NormBase | No ne
- wait_or_finish(observe: bool, into: pcntoolkit.normative_model.NormativeModel | None = None, *data_sources) pcntoolkit.normative_model.NormativeModel | None#
- wrap_in_job(job_name, python_callable_path, data_path)#
- job_observer = None#
- save_dir = ''#
- task_id = ''#
- unique_log_dir = ''#
- unique_temp_dir = ''#
- load_and_execute(args)#
Load a callable and data from a pickle file and execute it.
- Parameters:
args (
list[str]) – A list of arguments. The first argument is the path to the callable. The second argument is the path to the data. The third argument is the max number of retries.