Source code for dfttopif.parsers.pwscf

from pypif.obj.common.property import Property

from .base import DFTParser, Value_if_true
import os
from pypif.obj.common.value import Value
from ase import Atoms

[docs]class PwscfParser(DFTParser): ''' Parser for PWSCF calculations '''
[docs] def get_name(self): return "PWSCF"
def _get_line(self, search_string, search_file, basedir=None, return_string=True, case_sens=True): '''Return the first line containing a set of strings in a file. If return_string is False, we just return whether such a line was found. If case_sens is False, the search is case insensitive. ''' if basedir == None: basedir = self._directory # default if os.path.isfile(os.path.join(basedir, search_file)): # if single search string if type(search_string) == type(''): search_string = [search_string] # if case insensitive, convert everything to lowercase if not case_sens: search_string = [i.lower() for i in search_string] with open(os.path.join(basedir, search_file)) as fp: # search for the strings line by line for line in fp: query_line = line if case_sens else line.lower() if all([i in query_line for i in search_string]): return line if return_string else True if return_string: raise Exception('%s not found in %s'%(' & '.join(search_string),os.path.join(basedir, search_file))) else: return False else: raise Exception('%s file does not exist'%os.path.join(basedir, search_file))
[docs] def test_if_from(self, directory): '''Look for PWSCF input and output files''' self.inputf = self.outputf = '' files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))] for f in files: if self._get_line('Program PWSCF', f, basedir=directory, return_string=False): self.outputf = f elif self._get_line('&control', f, basedir=directory, return_string=False, case_sens=False): self.inputf = f if self.inputf and self.outputf: return True return False
[docs] def get_version_number(self): '''Determine the version number from the output''' line = self._get_line('Program PWSCF', self.outputf) version = " ".join(line.split('start')[0].split()[2:]).lstrip('v.') # return Value(scalars=version) return version
[docs] def get_xc_functional(self): '''Determine the xc functional from the output''' # the xc functional is described by 1 or 4 strings # SLA PZ NOGX NOGC ( 1 1 0 0 0) # SLA PW PBX PBC (1434) # PBE ( 1 4 3 4 0 0) # PBE0 (6484) # HSE (14*4) xcstring = self._get_line('Exchange-correlation', self.outputf).split()[2:6] for word in range(4): if xcstring[word][0] == '(': xcstring = xcstring[:word] break return Value(scalars=" ".join(xcstring))
[docs] def get_cutoff_energy(self): '''Determine the cutoff energy from the output''' cutoff = self._get_line('kinetic-energy cutoff', self.outputf).split()[3:] return Value(scalars=float(cutoff[0]), units=cutoff[1])
[docs] def get_total_energy(self): '''Determine the total energy from the output''' with open(os.path.join(self._directory, self.outputf)) as fp: # reading file backwards in case relaxation run for line in reversed(fp.readlines()): if "!" in line and "total energy" in line: energy = line.split()[4:] return Property(scalars=float(energy[0]), units=energy[1]) raise Exception('%s not found in %s'%('! & total energy',os.path.join(self._directory, self.outputf)))
@Value_if_true def is_relaxed(self): '''Determine if relaxation run from the output''' return self._get_line('Geometry Optimization', self.outputf, return_string=False) def _is_converged(self): '''Determine if calculation converged; for a relaxation (static) run we look for ionic (electronic) convergence in the output''' if self.is_relaxed(): # relaxation run case return self._get_line(['End of', 'Geometry Optimization'], self.outputf, return_string=False) else: # static run case return self._get_line('convergence has been achieved', self.outputf, return_string=False)
[docs] def get_KPPRA(self): '''Determine the no. of k-points in the BZ (from the input) times the no. of atoms (from the output)''' # Find the no. of k-points fp = open(os.path.join(self._directory, self.inputf)).readlines() for l,ll in enumerate(fp): if "K_POINTS" in ll: # determine the type of input if len(ll.split()) > 1: if "gamma" in ll.split()[1].lower(): ktype = 'gamma' elif "automatic" in ll.split()[1].lower(): ktype = 'automatic' else: ktype = '' else: ktype = '' if ktype == 'gamma': # gamma point: # K_POINTS {gamma} nk = 1 elif ktype == 'automatic': # automatic: # K_POINTS automatic # 12 12 1 0 0 0 line = [int(i) for i in fp[l+1].split()[0:3]] nk = line[0]*line[1]*line[2] else: # manual: # K_POINTS # 3 # 0.125 0.125 0.0 1.0 # 0.125 0.375 0.0 2.0 # 0.375 0.375 0.0 1.0 nk = 0 for k in range(int(fp[l+1].split()[0])): nk += int(float(fp[l+2+k].split()[3])) # Find the no. of atoms natoms = int(self._get_line('number of atoms/cell', self.outputf).split()[4]) return Value(scalars=nk*natoms) fp.close() raise Exception('%s not found in %s'%('KPOINTS',os.path.join(self._directory, self.inputf)))
@Value_if_true def uses_SOC(self): '''Looks for line with "with spin-orbit" in the output''' return self._get_line('with spin-orbit', self.outputf, return_string=False)
[docs] def get_pp_name(self): '''Determine the pseudopotential names from the output''' ppnames = [] # Find the number of atom types natomtypes = int(self._get_line('number of atomic types', self.outputf).split()[5]) # Find the pseudopotential names with open(os.path.join(self._directory, self.outputf)) as fp: for line in fp: if "PseudoPot. #" in line: ppnames.append(next(fp).split('/')[-1].rstrip()) if len(ppnames) == natomtypes: return Value(scalars=ppnames) raise Exception('Could not find %i pseudopotential names'%natomtypes)
[docs] def get_U_settings(self): '''Determine the DFT+U type and parameters from the output''' with open(os.path.join(self._directory, self.outputf)) as fp: for line in fp: if "LDA+U calculation" in line: U_param = {} U_param['Type'] = line.split()[0] U_param['Values'] = {} # look through next several lines for nl in range(15): line2 = next(fp).split() if len(line2) > 1 and line2[0] == "atomic": pass # column titles elif len(line2) == 6: U_param['Values'][line2[0]] = {} U_param['Values'][line2[0]]['L'] = float(line2[1]) U_param['Values'][line2[0]]['U'] = float(line2[2]) U_param['Values'][line2[0]]['J'] = float(line2[4]) else: break # end of data block return Value(**U_param) return None
[docs] def get_vdW_settings(self): '''Determine the vdW type if using vdW xc functional or correction scheme from the input otherwise''' xc = self.get_xc_functional().scalars[0].value if 'vdw' in xc.lower(): # vdW xc functional return Value(scalars=xc) else: # look for vdw_corr in input vdW_dict = {'xdm':'Becke-Johnson XDM', 'ts': 'Tkatchenko-Scheffler', 'ts-vdw': 'Tkatchenko-Scheffler', 'tkatchenko-scheffler': 'Tkatchenko-Scheffler', 'grimme-d2': 'Grimme D2', 'dft-d': 'Grimme D2'} if self._get_line('vdw_corr', self.inputf, return_string=False, case_sens=False): line = self._get_line('vdw_corr', self.inputf, return_string=True, case_sens=False) vdwkey = str(line.split('=')[-1].replace("'", "").replace(',', '').lower().rstrip()) return Value(scalars=vdW_dict[vdwkey]) return None
[docs] def get_pressure(self): '''Determine the pressure from the output''' if self._get_line('total stress', self.outputf, return_string=False) == False: return None else: line = self._get_line('total stress', self.outputf) # total stress (Ry/bohr**3) (kbar) P= -2.34 # P= runs into the value if very large pressure pvalue = float(line.split()[-1].replace('P=', '')) return Property(scalars=pvalue, units='kbar')
[docs] def get_stresses(self): '''Determine the stress tensor from the output''' stress_data = [] # stress tensor at each iteration with open(os.path.join(self._directory, self.outputf)) as fp: for line in fp: if "total" in line and "stress" in line: stress = [] for i in range(3): stress.append([float(j) for j in next(fp).split()[3:6]]) stress_data.append(stress) if len(stress_data) > 0: # return the final stress tensor return Property(matrices=stress_data[-1], units='kbar') else: return None
[docs] def get_output_structure(self): '''Determine the structure from the output''' bohr_to_angstrom = 0.529177249 # determine the number of atoms natoms = int(float(self._get_line('number of atoms/cell', self.outputf).split('=')[-1])) # determine the initial lattice parameter alat = float(self._get_line('lattice parameter (alat)', self.outputf).split('=')[-1].split()[0]) # find the initial unit cell unit_cell = [] with open(os.path.join(self._directory, self.outputf), 'r') as fp: for line in fp: if "crystal axes:" in line: for i in range(3): unit_cell.append([float(j)*alat*bohr_to_angstrom for j in next(fp).split('(')[-1].split(')')[0].split()]) break if len(unit_cell) == 0: raise Exception('Cannot find the initial unit cell') # find the initial atomic coordinates coords = [] ; atom_symbols = [] with open(os.path.join(self._directory, self.outputf), 'r') as fp: for line in fp: if "site n." in line and "atom" in line and "positions" in line and "alat units" in line: for i in range(natoms): coordline = next(fp) atom_symbols.append(''.join([i for i in coordline.split()[1] if not i.isdigit()])) coord_conv_factor = alat*bohr_to_angstrom coords.append([float(j)*coord_conv_factor for j in coordline.rstrip().split('=')[-1].split('(')[-1].split(')')[0].split()]) break if len(coords) == 0: raise Exception('Cannot find the initial atomic coordinates') if type(self.is_relaxed()) == type(None): # static run: create, populate, and return the initial structure structure = Atoms(symbols=atom_symbols, cell=unit_cell, pbc=True) structure.set_positions(coords) return structure else: # relaxation run: update with the final structure with open(os.path.join(self._directory, self.outputf)) as fp: for line in fp: if "Begin final coordinates" in line: if 'new unit-cell volume' in next(fp): # unit cell allowed to change next(fp) # blank line # get the final unit cell unit_cell = [] cellheader = next(fp) if 'bohr' in cellheader.lower(): cell_conv_factor = bohr_to_angstrom elif 'angstrom' in cellheader.lower(): cell_conv_factor = 1.0 else: alat = float(cellheader.split('alat=')[-1].replace(')', '')) cell_conv_factor = alat*bohr_to_angstrom for i in range(3): unit_cell.append([float(j)*cell_conv_factor for j in next(fp).split()]) next(fp) # blank line # get the final atomic coordinates coordtype = next(fp).split()[-1].replace('(', '').replace(')', '') if coordtype == 'bohr': coord_conv_factor = bohr_to_angstrom elif coordtype == 'angstrom' or coordtype == 'crystal': coord_conv_factor = 1.0 else: coord_conv_factor = alat*bohr_to_angstrom coords = [] # reinitialize the coords for i in range(natoms): coordline = next(fp).split() coords.append([float(j)*coord_conv_factor for j in coordline[1:4]]) # create, populate, and return the final structure structure = Atoms(symbols=atom_symbols, cell=unit_cell, pbc=True) if coordtype == 'crystal': structure.set_scaled_positions(coords) # direct coord else: structure.set_positions(coords) # cartesian coord return structure raise Exception('Cannot find the final coordinates')
[docs] def get_dos(self): '''Find the total DOS shifted by the Fermi energy''' # find the dos file fildos = '' files = [f for f in os.listdir(self._directory) if os.path.isfile(os.path.join(self._directory, f))] for f in files: fp = open(os.path.join(self._directory, f), 'r') first_line = next(fp) if "E (eV)" in first_line and "Int dos(E)" in first_line: fildos = f ndoscol = len(next(fp).split())-2 # number of spin channels fp.close() break fp.close() if not fildos: return None # cannot find DOS # get the Fermi energy line = self._get_line('the Fermi energy is', self.outputf) efermi = float(line.split('is')[-1].split()[0]) # grab the DOS energy = [] ; dos = [] fp = open(os.path.join(self._directory, fildos), 'r') next(fp) # comment line for line in fp: ls = line.split() energy.append(float(ls[0])-efermi) dos.append(sum([float(i) for i in ls[1:1+ndoscol]])) return Property(scalars=dos, units='number of states per unit cell', conditions=Value(name='energy', scalars=energy, units='eV'))
[docs] def get_forces(self): return None
[docs] def get_outcar(self): return None
[docs] def get_incar(self): return None
[docs] def get_poscar(self): return None
[docs] def get_band_gap(self): '''Compute the band gap from the DOS''' dosdata = self.get_dos() if type(dosdata) == type(None): return None # cannot find DOS else: energy = dosdata.conditions.scalars dos = dosdata.scalars step_size = energy[1].value - energy[0].value not_found = True ; l = 0 ; bot = 10**3 ; top = -10**3 while not_found and l < len(dos): # iterate through the data e = float(energy[l].value) dens = float(dos[l].value) # note: dos already shifted by efermi if e < 0 and dens > 1e-3: bot = e elif e > 0 and dens > 1e-3: top = e not_found = False l += 1 if top < bot: raise Exception('Algorithm failed to find the band gap') elif top - bot < step_size*2: return Property(scalars=0, units='eV') else: bandgap = float(top-bot) return Property(scalars=round(bandgap,3), units='eV')