ai2_kit.core.queue_system module#

class ai2_kit.core.queue_system.BaseQueueSystem[source]#

Bases: ABC

abstract cancel(job_id: str)[source]#
connector: BaseConnector#
abstract get_job_id_envvar() str[source]#
abstract get_job_id_pattern() str[source]#
abstract get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_polling_interval() int[source]#
abstract get_script_suffix() str[source]#
get_setup_script() str[source]#
abstract get_submit_cmd() str[source]#
submit(script: str, cwd: str, name: str | None = None, success_indicator: str | None = None)[source]#
class ai2_kit.core.queue_system.Lsf[source]#

Bases: BaseQueueSystem

cancel(job_id: str)[source]#
config: LSF#
get_job_id_envvar() str[source]#
get_job_id_pattern()[source]#
get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_polling_interval()[source]#
get_script_suffix()[source]#
get_submit_cmd()[source]#
class ai2_kit.core.queue_system.PBS[source]#

Bases: BaseQueueSystem

cancel(job_id: str)[source]#
config: PBS#
get_job_id_envvar() str[source]#
get_job_id_pattern() str[source]#
get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_script_suffix() str[source]#
get_setup_script() str[source]#
get_submit_cmd() str[source]#
translate_table = {'B': JobState.RUNNING, 'C': JobState.COMPLETED, 'E': JobState.COMPLETED, 'H': JobState.HELD, 'Q': JobState.PENDING, 'R': JobState.RUNNING, 'S': JobState.HELD, 'W': JobState.PENDING}#
class ai2_kit.core.queue_system.QueueJobFuture(queue_system: BaseQueueSystem, job_id: str, script: str, cwd: str, name: str, success_indicator: str, polling_interval=10)[source]#

Bases: JobFuture

cancel()[source]#
done()[source]#
get_job_state()[source]#
is_success()[source]#
resubmit()[source]#
result(timeout: float = inf) JobState[source]#
async result_async(timeout: float = inf) JobState[source]#

Though this is not fully async, as the job submission and state polling are still blocking, but it is already good enough to handle thousands of jobs (I guess).

property success_indicator_path#
pydantic model ai2_kit.core.queue_system.QueueSystemConfig[source]#

Bases: BaseModel

Show JSON schema
{
   "title": "QueueSystemConfig",
   "type": "object",
   "properties": {
      "slurm": {
         "anyOf": [
            {
               "$ref": "#/$defs/Slurm"
            },
            {
               "type": "null"
            }
         ],
         "default": null
      },
      "lsf": {
         "anyOf": [
            {
               "$ref": "#/$defs/LSF"
            },
            {
               "type": "null"
            }
         ],
         "default": null
      },
      "pbs": {
         "anyOf": [
            {
               "$ref": "#/$defs/PBS"
            },
            {
               "type": "null"
            }
         ],
         "default": null
      }
   },
   "$defs": {
      "LSF": {
         "additionalProperties": false,
         "properties": {
            "bsub_bin": {
               "default": "bsub",
               "title": "Bsub Bin",
               "type": "string"
            },
            "bjobs_bin": {
               "default": "bjobs",
               "title": "Bjobs Bin",
               "type": "string"
            },
            "polling_interval": {
               "default": 10,
               "title": "Polling Interval",
               "type": "integer"
            }
         },
         "title": "LSF",
         "type": "object"
      },
      "PBS": {
         "additionalProperties": false,
         "properties": {
            "qsub_bin": {
               "default": "qsub",
               "title": "Qsub Bin",
               "type": "string"
            },
            "qstat_bin": {
               "default": "qstat",
               "title": "Qstat Bin",
               "type": "string"
            },
            "qdel_bin": {
               "default": "qdel",
               "title": "Qdel Bin",
               "type": "string"
            }
         },
         "title": "PBS",
         "type": "object"
      },
      "Slurm": {
         "additionalProperties": false,
         "properties": {
            "sbatch_bin": {
               "default": "sbatch",
               "title": "Sbatch Bin",
               "type": "string"
            },
            "squeue_bin": {
               "default": "squeue",
               "title": "Squeue Bin",
               "type": "string"
            },
            "scancel_bin": {
               "default": "scancel",
               "title": "Scancel Bin",
               "type": "string"
            },
            "polling_interval": {
               "default": 10,
               "title": "Polling Interval",
               "type": "integer"
            }
         },
         "title": "Slurm",
         "type": "object"
      }
   },
   "additionalProperties": false
}

Config:
  • extra: str = forbid

Fields:
field lsf: LSF | None = None#
field pbs: PBS | None = None#
field slurm: Slurm | None = None#
pydantic model LSF[source]#

Bases: BaseModel

Show JSON schema
{
   "title": "LSF",
   "type": "object",
   "properties": {
      "bsub_bin": {
         "default": "bsub",
         "title": "Bsub Bin",
         "type": "string"
      },
      "bjobs_bin": {
         "default": "bjobs",
         "title": "Bjobs Bin",
         "type": "string"
      },
      "polling_interval": {
         "default": 10,
         "title": "Polling Interval",
         "type": "integer"
      }
   },
   "additionalProperties": false
}

Config:
  • extra: str = forbid

Fields:
  • bjobs_bin (str)

  • bsub_bin (str)

  • polling_interval (int)

field bjobs_bin: str = 'bjobs'#
field bsub_bin: str = 'bsub'#
field polling_interval: int = 10#
pydantic model PBS[source]#

Bases: BaseModel

Show JSON schema
{
   "title": "PBS",
   "type": "object",
   "properties": {
      "qsub_bin": {
         "default": "qsub",
         "title": "Qsub Bin",
         "type": "string"
      },
      "qstat_bin": {
         "default": "qstat",
         "title": "Qstat Bin",
         "type": "string"
      },
      "qdel_bin": {
         "default": "qdel",
         "title": "Qdel Bin",
         "type": "string"
      }
   },
   "additionalProperties": false
}

Config:
  • extra: str = forbid

Fields:
  • qdel_bin (str)

  • qstat_bin (str)

  • qsub_bin (str)

field qdel_bin: str = 'qdel'#
field qstat_bin: str = 'qstat'#
field qsub_bin: str = 'qsub'#
pydantic model Slurm[source]#

Bases: BaseModel

Show JSON schema
{
   "title": "Slurm",
   "type": "object",
   "properties": {
      "sbatch_bin": {
         "default": "sbatch",
         "title": "Sbatch Bin",
         "type": "string"
      },
      "squeue_bin": {
         "default": "squeue",
         "title": "Squeue Bin",
         "type": "string"
      },
      "scancel_bin": {
         "default": "scancel",
         "title": "Scancel Bin",
         "type": "string"
      },
      "polling_interval": {
         "default": 10,
         "title": "Polling Interval",
         "type": "integer"
      }
   },
   "additionalProperties": false
}

Config:
  • extra: str = forbid

Fields:
  • polling_interval (int)

  • sbatch_bin (str)

  • scancel_bin (str)

  • squeue_bin (str)

field polling_interval: int = 10#
field sbatch_bin: str = 'sbatch'#
field scancel_bin: str = 'scancel'#
field squeue_bin: str = 'squeue'#
class ai2_kit.core.queue_system.Slurm[source]#

Bases: BaseQueueSystem

cancel(job_id: str)[source]#
config: Slurm#
get_job_id_envvar() str[source]#
get_job_id_pattern()[source]#
get_job_state(job_id: str, success_indicator_path: str) JobState[source]#
get_polling_interval()[source]#
get_script_suffix()[source]#
get_submit_cmd()[source]#
translate_table = {'CA': JobState.CANCELLED, 'CD': JobState.COMPLETED, 'CF': JobState.PENDING, 'CG': JobState.RUNNING, 'F': JobState.FAILED, 'NF': JobState.FAILED, 'PD': JobState.PENDING, 'R': JobState.RUNNING, 'RV': JobState.FAILED, 'SE': JobState.FAILED, 'TO': JobState.TIMEOUT}#
ai2_kit.core.queue_system.inject_cmd_to_script(script: str, cmd: str)[source]#

Find the position of first none comment or empty lines, and inject command before it