Source code for ai2_kit.domain.dpff

from dpdata.unit import econvs
from typing import List, Optional

import numpy as np

import dpdata
import os
import re


from .dplr import set_dplr_ext_from_cp2k_output, get_sel_ids
from ai2_kit.core.log import get_logger

logger = get_logger(__name__)


[docs]def dpdata_read_cp2k_dpff_data( cp2k_dir: str, cp2k_output: str, wannier_file: str, type_map: List[str], sys_charge_map: List[int], model_charge_map: List[int], ewald_h: float, ewald_beta: float, ext_efield, sel_type: List[int], wannier_cutoff: float = 1.0, wannier_spread_file: Optional[str] = None, ): """ Gnereate dpdata from cp2k output and wannier file for DPLR :param cp2k_dir: the directory of cp2k output and wannier file :param cp2k_output: the cp2k output file :param wannier_file: the wannier file :param type_map: the type map of atom type, for example, [O,H] :param sys_charge_map: the charge map of atom in system, for example, [6, 1] :param model_charge_map: the charge map of wannier in model, for example, [-8] :param ewald_h: the ewald_h parameter used in dplr/dpff model :param ewald_beta: the ewald_beta parameter used in dplr/dpff model :param ext_efield: the external electric field :param sel_type: the selected type of atom, for example, [0] means atom type 0, aka O is selected :param wannier_cutoff: the cutoff to allocate wannier centers around atoms :param wannier_spread_file: the wannier spread file, if provided, the spread data will be added to dp_sys :return dp_sys: dpdata.LabeledSystem In addition to the common energy data, the following data are added: - atomic_dipole: the atomic dipole of selected atoms - ext_efield: the external electric field - efield: the direction vector of atomic electric field - aparam: the atomic electric field magnitude """ cp2k_output = os.path.join(cp2k_dir, cp2k_output) wannier_file = os.path.join(cp2k_dir, wannier_file) wannier_spread_file = ( os.path.join(cp2k_dir, wannier_spread_file) if wannier_spread_file else None ) dp_sys = dpdata.LabeledSystem(cp2k_output, fmt="cp2k/output") try: dp_sys = set_dpff_ext_from_cp2k_output( dp_sys, cp2k_output, wannier_file, type_map, sys_charge_map, model_charge_map, ewald_h, ewald_beta, ext_efield, sel_type, wannier_cutoff, wannier_spread_file, ) except ValueError: dp_sys = None return dp_sys
[docs]def set_dpff_ext_from_cp2k_output( dp_sys: dpdata.LabeledSystem, cp2k_output: str, wannier_file: str, type_map: List[str], sys_charge_map: List[int], model_charge_map: List[int], ewald_h: float, ewald_beta: float, ext_efield, sel_type: List[int], wannier_cutoff: float = 1.0, wannier_spread_file: Optional[str] = None, ): # with atomic_dipole # update energy # add ext_efield, efield, aparam dplr_dp_sys = set_dplr_ext_from_cp2k_output( dp_sys, wannier_file, type_map, sel_type, wannier_cutoff, wannier_spread_file, model_charge_map, ) atomic_dipole = dplr_dp_sys.data["atomic_dipole"].reshape(-1, 3) # type: ignore with open(cp2k_output, "r", encoding="UTF-8") as fp: raw_energy = get_cp2k_output_total_energy(fp) ext_efield = np.reshape(ext_efield, [1, 3]) natoms = dp_sys.get_natoms() nframes = dp_sys.get_nframes() if nframes == 0: return None assert nframes == 1, "Only support one frame" symbols = np.array(dp_sys.data["atom_names"])[dp_sys.data["atom_types"]] sel_ids = get_sel_ids(dp_sys, type_map, sel_type) ion_coords = dp_sys.data["coords"].reshape(-1, 3) # atomic_dipole.shape = (natoms * 3) wannier_coords = (ion_coords + atomic_dipole)[sel_ids].reshape(-1, 3) extended_coords = np.concatenate((ion_coords, wannier_coords), axis=0) # get extended charges extended_charges = np.zeros(len(extended_coords)) for symbol, ion_charge in zip(type_map, sys_charge_map): extended_charges[np.where(symbols == symbol)[0]] = ion_charge sel_symbols = symbols[sel_ids] for ii, wannier_charge in zip(sel_type, model_charge_map): sel_symbol = type_map[ii] extended_charges[np.where(sel_symbols == sel_symbol)[0] + natoms] = ( wannier_charge ) total_dipole = np.sum(extended_coords * extended_charges.reshape(-1, 1), axis=0) energy = raw_energy - np.inner(total_dipole, ext_efield.reshape(3)) dp_sys.data["energies"] = np.reshape(energy, [-1]) pbc_efield = get_pbc_atomic_efield( dp_sys, extended_coords, extended_charges, ewald_h, ewald_beta ) # natoms * 3 _efield = pbc_efield + ext_efield aparam = np.linalg.norm(_efield, axis=-1) efield = np.array(_efield) / aparam.reshape(natoms, 1) dp_sys.data["ext_efield"] = ext_efield.reshape(nframes, 3) dp_sys.data["efield"] = efield.reshape(nframes, natoms, 3) dp_sys.data["aparam"] = aparam.reshape(nframes, natoms, -1) return dp_sys
[docs]def get_cp2k_output_total_energy(fp): start_pattern = r" Total charge density g-space grids:" end_pattern = r" Total energy:" start_pattern = re.compile(start_pattern) end_pattern = re.compile(end_pattern) flag = False data_lines = [] nloop = 0 for line in fp: line = line.strip("\n") if start_pattern.match(line): flag = True elif flag and end_pattern.match(line): flag = False nloop += 1 if flag is True: data_lines.append(line) data_lines = np.reshape(data_lines, (nloop, -1)) tot_e = 0.0 # grep the data from the last iteration for kw in data_lines[-1, 2:-1]: kw = kw.split(":") k = kw[0].strip(" ") v = float(kw[1]) * econvs["hartree"] if k != "Fermi energy": tot_e += v return tot_e
[docs]def get_pbc_atomic_efield( dp_sys, extended_coords, extended_charges, ewald_h, ewald_beta, ): from deepmd.infer.ewald_recp import EwaldRecp er = EwaldRecp(ewald_h, ewald_beta) natoms = dp_sys.get_natoms() cell = dp_sys.data["cells"] e, f, v = er.eval( extended_coords.reshape(1, -1), extended_charges.reshape(1, -1), cell.reshape(1, -1), ) extended_efield = f.reshape(-1, 3) / extended_charges.reshape(-1, 1) # natoms * 3 pbc_efield = extended_efield[:natoms] return pbc_efield