Source code for ai2_kit.core.script

from typing import Optional, List, Union, Sequence
import shlex

from .pydantic import BaseModel

[docs] def exit_on_error_statment(v='__EXITCODE__'): return f"{v}=$?; if [ ${v} -ne 0 ]; then exit ${v}; fi"
[docs] def eoe_step(cmd: str): """ exit on error step """ return '\n'.join([cmd, exit_on_error_statment()])
[docs] class BashTemplate(BaseModel): shebang: str = '#!/bin/bash' header: str = '' setup: str = '' teardown: str = ''
[docs] class BashStep(BaseModel): cmd: Union[str, List[str]] cwd: Optional[str] = None checkpoint: Optional[str] = None exit_on_error: bool = True
[docs] def render(self): if isinstance(self.cmd, list): self.cmd = ' '.join(self.cmd) rendered_step = '\n'.join([ '#' * 80, self.cmd, '#' * 80, ]) if self.exit_on_error: rendered_step = '\n'.join([ rendered_step, exit_on_error_statment() ]) if self.checkpoint: checkpoint = shlex.quote(self.checkpoint + '.checkpoint') msg = shlex.quote(f"hit {checkpoint}, skip") rendered_step = '\n'.join([ f'if [ -f {checkpoint} ]; then echo {msg}; else', rendered_step, f'touch {checkpoint}; fi # create checkpoint on success', ]) if self.cwd: cwd = shlex.quote(self.cwd) rendered_step = '\n'.join([ f'pushd {cwd} || exit 1', rendered_step, 'popd', ]) return rendered_step
BashSteps = Sequence[Union[str, BashStep]]
[docs] class BashScript(BaseModel): template: Optional[BashTemplate] steps: BashSteps
[docs] def render(self): if self.template is None: return _render_bash_steps(self.steps) return '\n'.join([ self.template.shebang, self.template.header, self.template.setup, _render_bash_steps(self.steps), self.template.teardown, ])
def _render_bash_steps(steps: BashSteps): rendered_steps = [] for step in steps: if isinstance(step, str): rendered_steps.append(step) else: assert isinstance(step, BashStep) rendered_steps.append(step.render()) rendered_steps.append('') return '\n'.join(rendered_steps)
[docs] def make_gpu_parallel_steps(step_groups: List[Union[BashSteps, BashStep, str]]): p_steps = [ 'IFS="," read -r -a _GPUS<<< "$CUDA_VISIBLE_DEVICES"', '_TOTAL_GPUS=${#_GPUS[@]}', ] for i, steps in enumerate(step_groups): if isinstance(steps, str) or isinstance(steps, BashStep): steps = [steps] assert isinstance(steps, list), f'expect list of steps, got {type(steps)}' p_steps += [ f'_NTH_GPU=$(({i} % $_TOTAL_GPUS))', '_GPU=${_GPUS[$_NTH_GPU]}', 'touch "_gpu_$_GPU.lock"', '{', 'export CUDA_VISIBLE_DEVICES=$_GPU', 'flock 3', f'echo "run steps group {i} on $_GPU, CUDA_VISIBLE_DEVICES=$CUDA_VISIBLE_DEVICES"', *steps, '} 3<"_gpu_$_GPU.lock" &', ] p_steps.append('wait') return p_steps