AI2-Kit HPC Executor#
介绍#
ai2-kit
实现了一个轻量级的 HPC 执行器用于提交和管理 HPC 集群上的作业。相比于其它的 HPC 调度器或者支持 HPC 的工作流框架 (如 DpDispatcher, parsl, DFlow, etc), ai2-kit HPC executor
具有以下特点:
支持 SSH 远程执行和本地执行两种模式
支持通过跳板机连接 HPC 集群
支持远程执行 Python 函数
支持在登录节点直接执行简单命令或者函数
更高效稳定的作业状态轮询机制
基于 Checkpoint 的状态恢复机制
支持同步或者异步等待作业执行
易于定制或与其它框架集成
如果您在使用其它方案时遇到以下问题,不妨试试 ai2-kit HPC executor
:
过高的上手和学习成本
连接不稳定
需要频繁将数据复制到本地进行处理后再提交回集群运行
当前, ai2-kit HPC executor
只支持 Slurm 作业系统,对其它作业系统的支持视实际需求而定,欢迎提交 Issue 或 PR。
使用方法#
基本用法#
初始化 ai2-kit HPC executor
有两种方式,使用字典配置或者 Pydantic
对象, 两种方式的配置项完全一致,前者更适合直接使用,后者更适合集成到其它框架。 以下为使用字典进行配置的示例。
from ai2_kit.core.executor import HpcExecutor
executor = HpcExecutor.from_config({
'ssh': { # 指定 ssh 连接信息,如果缺省则使用本地执行
'host': 'user01@hpc-login01',
'gateway': { # 如果需要通过跳板机连接,可指定此配置 (可选)
'host': 'user01@jump-host',
}
},
'queue_system': {
'slurm': {} # 指定作业系统为 Slurm
},
'work_dir': '/home/user01/ai2-kit/work_dir', # 指定工作目录
'python_cmd': '/home/user01/conda/env/py39/bin/python', # 指定 Python 解释器
}, 'cheng-lab') # 指定集群名称 (可选)
executor.init() # 初始化执行器
executor.run('echo "hello world"') # 在登录节点执行命令
上述示例完成如下工作:
实例化一个
HpcExecutor
对象初始化
HpcExecutor
对象在登录节点执行命令
echo "hello world"
远程执行 Python 函数#
在 HPC 上执行复杂的计算任务的通常模式是:
在登录节点上通过命令行或者Python准备好计算任务的输入数据
提交作业到队列并等待
作业完成后在登录节点上使用命令行或者 Python 处理计算任务的输出数据
为了满足上述模式,ai2-kit HPC executor
除了提供 run
接口用于在登录节点执行命令外,还提供了 run_python_fn
用于在登录节点直接运行 Python 函数。
def add(a, b):
return a + b
result = executor.run_python_fn(add)(1, 2) # 在登录节点上运行 add 函数
需要注意的是,为了在远程节点执行本地 Python 函数必须满足以下条件:
本地 Python 环境与远程 Python 环境的主版本需一致 (如同为 3.8.x)
远程 Python 环境的配置可以通过
python_cmd
参数指定
函数的参数和返回值必须是可序列化的 (不能包含诸如锁、文件句柄等不可序列化的对象)
函数依赖的软件包必须存在于登录节点的 Python 环境中
例如,假设远程执行的函数使用了
numpy
包,那么登录节点的 Python 环境中必须存在numpy
包
如果函数依赖于其它本地实现的方法或者类,这些方法和类需要通过特殊方式定义, 否则会在远程出现
ModuleNotFoundError
此为 cloudpickle 的限制,详见: 1
提交作业#
ai2-kit HPC executor
提供了 submit
接口用于提交作业到 HPC 集群。 示例如下
script = '''\
#! /bin/bash
#SBATCH --job-name=demo
#SBATCH -N 1
#SBATCH --ntasks-per-node=4
#SBATCH --partition=cpu-small
#SBATCH --mem=1G
echo "hello world" > output.txt
'''
job = executor.submit(script) # 提交作业
除了上述直接编写脚本的方式外,也可以通过 ai2-kit
提供的工具类生成脚本,示例如下:
from ai2_kit.core.script import BashScript, BashStep, BashTemplate
header = '''\
#SBATCH --job-name=demo
#SBATCH -N 1
#SBATCH --ntasks-per-node=4
#SBATCH --partition=cpu-small
#SBATCH --mem=1G
'''
script = BashScript(
template=BashTemplate(
header=header,
),
steps=[
BashStep(cmd='echo "hello world"'),
]
)
job = executor.submit(script.render(), cwd='/path/to/cwd') # 提交作业
同时,提交作业时可以指定额外的参数满足不同的需求,如设定执行目录和用于错误恢复的checkpoint文件。
等待作业完成#
提交任务提交后可以有同步和异步两种方式等待其完成。示例如下:
...
state = job.result() # 同步等待作业完成
async def main():
...
state = await job.result_async() # 异步等待作业完成
同步等待由于会阻塞后续的代码执行,因此对于需要并行提交多个作业的场景不适用,此时可以使用异步等待。
实现简单工作流#
虽然 ai2-kit
并未提供工作流调度引擎,但对于简单的任务来说,借助 Python 的异步支持和 ai2-kit
提供的工具类,可以很容易实现一个简单的工作流。接下来通过实现以下一个简单的任务做为演示:
前处理:实现一个 Python 函数创建 n 个工作目录及一个包含一个随机数的 input 文件
提交 n 个作业脚本,每个作业脚本读取 input 中的数字计算其平方写入 output 文件中
后处理:实现一个 Python 函数,在所有作业完成后读出所有文件的 output 值并求和
具体的代码实现可在 simple-workflow 中查看。
从代码实现上可以看出,通过 ai2-kit
实现的工作流本质仍然是普通的 Python 代码,只是当需要在远程执行函数或者提交作业时才会调用到 ai2-kit HPC executor
提供的接口。因此,对于熟悉 Python 编码的同学上手 ai2-kit
并不困难。
更复杂的工作流也是采用同样的思路实现,只是在此基础上增加了针对配置文件的建模和解析,以及使用条件分支和循环进行流程控制。如有兴趣可参考 ai2-kit.workflow
模块中的代码。