ai2_kit.core.queue_system module

ai2_kit.core.queue_system module#

class ai2_kit.core.queue_system.BaseQueueSystem[source]#

Bases: ABC

abstract cancel(job_id: str)[source]#
connector: BaseConnector#
abstract get_job_id_envvar() str[source]#
abstract get_job_id_pattern() str[source]#
abstract get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_polling_interval() int[source]#
abstract get_script_suffix() str[source]#
get_setup_script() str[source]#
abstract get_submit_cmd() str[source]#
submit(script: str, cwd: str, name: Optional[str] = None, success_indicator: Optional[str] = None)[source]#
class ai2_kit.core.queue_system.Lsf[source]#

Bases: BaseQueueSystem

cancel(job_id: str)[source]#
config: LSF#
get_job_id_envvar() str[source]#
get_job_id_pattern()[source]#
get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_polling_interval()[source]#
get_script_suffix()[source]#
get_submit_cmd()[source]#
class ai2_kit.core.queue_system.PBS[source]#

Bases: BaseQueueSystem

cancel(job_id: str)[source]#
config: PBS#
get_job_id_envvar() str[source]#
get_job_id_pattern() str[source]#
get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_script_suffix() str[source]#
get_setup_script() str[source]#
get_submit_cmd() str[source]#
translate_table = {'B': JobState.RUNNING, 'C': JobState.COMPLETED, 'E': JobState.COMPLETED, 'H': JobState.HELD, 'Q': JobState.PENDING, 'R': JobState.RUNNING, 'S': JobState.HELD, 'W': JobState.PENDING}#
class ai2_kit.core.queue_system.QueueJobFuture(queue_system: BaseQueueSystem, job_id: str, script: str, cwd: str, name: str, success_indicator: str, polling_interval=10)[source]#

Bases: JobFuture

cancel()[source]#
done()[source]#
get_job_state()[source]#
is_success()[source]#
resubmit()[source]#
result(timeout: float = inf) JobState[source]#
async result_async(timeout: float = inf) JobState[source]#

Though this is not fully async, as the job submission and state polling are still blocking, but it is already good enough to handle thousands of jobs (I guess).

property success_indicator_path#
class ai2_kit.core.queue_system.QueueSystemConfig(*, slurm: Optional[Slurm] = None, lsf: Optional[LSF] = None, pbs: Optional[PBS] = None)[source]#

Bases: BaseModel

class LSF(*, bsub_bin: str = 'bsub', bjobs_bin: str = 'bjobs', polling_interval: int = 10)[source]#

Bases: BaseModel

bjobs_bin: str#
bsub_bin: str#
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'bjobs_bin': FieldInfo(annotation=str, required=False, default='bjobs'), 'bsub_bin': FieldInfo(annotation=str, required=False, default='bsub'), 'polling_interval': FieldInfo(annotation=int, required=False, default=10)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

polling_interval: int#
class PBS(*, qsub_bin: str = 'qsub', qstat_bin: str = 'qstat', qdel_bin: str = 'qdel')[source]#

Bases: BaseModel

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'qdel_bin': FieldInfo(annotation=str, required=False, default='qdel'), 'qstat_bin': FieldInfo(annotation=str, required=False, default='qstat'), 'qsub_bin': FieldInfo(annotation=str, required=False, default='qsub')}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

qdel_bin: str#
qstat_bin: str#
qsub_bin: str#
class Slurm(*, sbatch_bin: str = 'sbatch', squeue_bin: str = 'squeue', scancel_bin: str = 'scancel', polling_interval: int = 10)[source]#

Bases: BaseModel

model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'polling_interval': FieldInfo(annotation=int, required=False, default=10), 'sbatch_bin': FieldInfo(annotation=str, required=False, default='sbatch'), 'scancel_bin': FieldInfo(annotation=str, required=False, default='scancel'), 'squeue_bin': FieldInfo(annotation=str, required=False, default='squeue')}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

polling_interval: int#
sbatch_bin: str#
scancel_bin: str#
squeue_bin: str#
lsf: Optional[LSF]#
model_computed_fields: ClassVar[dict[str, ComputedFieldInfo]] = {}#

A dictionary of computed field names and their corresponding ComputedFieldInfo objects.

model_config: ClassVar[ConfigDict] = {'extra': 'forbid'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'lsf': FieldInfo(annotation=Union[QueueSystemConfig.LSF, NoneType], required=False, default=None), 'pbs': FieldInfo(annotation=Union[QueueSystemConfig.PBS, NoneType], required=False, default=None), 'slurm': FieldInfo(annotation=Union[QueueSystemConfig.Slurm, NoneType], required=False, default=None)}#

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

pbs: Optional[PBS]#
slurm: Optional[Slurm]#
class ai2_kit.core.queue_system.Slurm[source]#

Bases: BaseQueueSystem

cancel(job_id: str)[source]#
config: Slurm#
get_job_id_envvar() str[source]#
get_job_id_pattern()[source]#
get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_polling_interval()[source]#
get_script_suffix()[source]#
get_submit_cmd()[source]#
translate_table = {'CA': JobState.CANCELLED, 'CD': JobState.COMPLETED, 'CF': JobState.PENDING, 'CG': JobState.RUNNING, 'F': JobState.FAILED, 'NF': JobState.FAILED, 'PD': JobState.PENDING, 'R': JobState.RUNNING, 'RV': JobState.FAILED, 'SE': JobState.FAILED, 'TO': JobState.TIMEOUT}#
ai2_kit.core.queue_system.inject_cmd_to_script(script: str, cmd: str)[source]#

Find the position of first none comment or empty lines, and inject command before it