"""
A module for scheduling ARC jobs
Includes spawning, terminating, checking, and troubleshooting various jobs
"""
import datetime
import itertools
import os
import pprint
import shutil
import time
import numpy as np
from IPython.display import display
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
import arc.rmgdb as rmgdb
from arc import parser, plotter
from arc.checks.common import get_i_from_job_name, sum_time_delta
from arc.checks.ts import check_imaginary_frequencies, check_ts, check_irc_species_and_rxn
from arc.common import (extremum_list,
get_angle_in_180_range,
get_logger,
get_number_with_ordinal_indicator,
is_angle_linear,
read_yaml_file,
safe_copy_file,
save_yaml_file,
sort_two_lists_by_the_first,
torsions_to_scans,
)
from arc.exceptions import (InputError,
SanitizationError,
SchedulerError,
SpeciesError,
TrshError,
)
from arc.imports import settings
from arc.job.adapters.common import all_families_ts_adapters, default_incore_adapters, ts_adapters_by_rmg_family
from arc.job.factory import job_factory
from arc.job.local import check_running_jobs_ids
from arc.job.ssh import SSHClient
from arc.job.trsh import (scan_quality_check,
trsh_conformer_isomorphism,
trsh_ess_job,
trsh_negative_freq,
trsh_scan_job,
)
from arc.level import Level
from arc.species.species import (ARCSpecies,
are_coords_compliant_with_graph,
determine_rotor_symmetry,
TSGuess)
from arc.species.converter import (check_isomorphism,
compare_confs,
molecules_from_xyz,
xyz_to_coords_list,
xyz_to_str,
)
from arc.species.vectors import get_angle, calculate_dihedral_angle
if TYPE_CHECKING:
from arc.job.adapter import JobAdapter
from arc.reaction import ARCReaction
logger = get_logger()
LOWEST_MAJOR_TS_FREQ, HIGHEST_MAJOR_TS_FREQ, default_job_settings, \
default_job_types, default_ts_adapters, max_rotor_trsh, rotor_scan_resolution, servers_dict = \
settings['LOWEST_MAJOR_TS_FREQ'], settings['HIGHEST_MAJOR_TS_FREQ'], settings['default_job_settings'], \
settings['default_job_types'], settings['ts_adapters'], settings['max_rotor_trsh'], \
settings['rotor_scan_resolution'], settings['servers']
[docs]class Scheduler(object):
"""
ARC's Scheduler class. Creates jobs, submits, checks status, troubleshoots.
Each species in `species_list` has to have a unique label.
Dictionary structures::
job_dict = {label_1: {'conf_opt': {0: Job1,
1: Job2, ...},
'conf_sp': {0: Job1,
1: Job2, ...},
'tsg': {0: Job1,
1: Job2, ...}, # TS guesses
'opt': {job_name1: Job1,
job_name2: Job2, ...},
'sp': {job_name1: Job1,
job_name2: Job2, ...},
'freq': {job_name1: Job1,
job_name2: Job2, ...},
'composite': {job_name1: Job1,
job_name2: Job2, ...},
'scan': {job_name1: Job1,
job_name2: Job2, ...},
<job_type>: {job_name1: Job1,
job_name2: Job2, ...},
...
}
label_2: {...},
}
output = {label_1: {'job_types': {job_type1: <status1>, # boolean
job_type2: <status2>,
},
'paths': {'geo': <path to geometry optimization output file>,
'freq': <path to freq output file>,
'sp': <path to sp output file>,
'composite': <path to composite output file>,
'irc': [list of two IRC paths],
},
'conformers': <comments>,
'isomorphism': <comments>,
'convergence': <status>, # Optional[bool]
'restart': <comments>,
'info': <comments>,
'warnings': <comments>,
'errors': <comments>,
},
label_2: {...},
}
Note:
The rotor scan dicts are located under Species.rotors_dict
Args:
project (str): The project's name. Used for naming the working directory.
ess_settings (dict): A dictionary of available ESS and a corresponding server list.
species_list (list): Contains input :ref:`ARCSpecies <species>` objects (both wells and TSs).
rxn_list (list): Contains input :ref:`ARCReaction <reaction>` objects.
project_directory (str): Folder path for the project: the input file path or ARC/Projects/project-name.
composite_method (str, optional): A composite method to use.
conformer_opt_level (Union[str, dict], optional): The level of theory to use for conformer comparisons.
conformer_sp_level (Union[str, dict], optional): The level of theory to use for conformer sp jobs.
opt_level (Union[str, dict], optional): The level of theory to use for geometry optimizations.
freq_level (Union[str, dict], optional): The level of theory to use for frequency calculations.
sp_level (Union[str, dict], optional): The level of theory to use for single point energy calculations.
scan_level (Union[str, dict], optional): The level of theory to use for torsion scans.
ts_guess_level (Union[str, dict], optional): The level of theory to use for TS guess comparisons.
irc_level (Union[str, dict], optional): The level of theory to use for IRC calculations.
orbitals_level (Union[str, dict], optional): The level of theory to use for calculating MOs (for plotting).
adaptive_levels (dict, optional): A dictionary of levels of theory for ranges of the number of heavy atoms
in the species. Keys are tuples of (min_num_atoms, max_num_atoms),
values are dictionaries with job type tuples as keys and levels of theory
as values. 'inf' is accepted in max_num_atoms
rmg_database (RMGDatabase, optional): The RMG database object.
job_types (dict, optional): A dictionary of job types to execute. Keys are job types, values are boolean.
bath_gas (str, optional): A bath gas. Currently used in OneDMin to calc L-J parameters.
Allowed values are He, Ne, Ar, Kr, H2, N2, O2.
restart_dict (dict, optional): A restart dictionary parsed from a YAML restart file.
max_job_time (float, optional): The maximal allowed job time on the server in hours (can be fractional).
allow_nonisomorphic_2d (bool, optional): Whether to optimize species even if they do not have a 3D conformer
that is isomorphic to the 2D graph representation.
memory (float, optional): The total allocated job memory in GB (14 by default).
testing (bool, optional): Used for internal ARC testing (generating the object w/o executing it).
dont_gen_confs (list, optional): A list of species labels for which conformer jobs were loaded from a restart
file, or user-requested. Additional conformer generation should be avoided.
n_confs (int, optional): The number of lowest force field conformers to consider.
e_confs (float, optional): The energy threshold in kJ/mol above the lowest energy conformer below which
force field conformers are considered.
fine_only (bool): If ``True`` ARC will not run optimization jobs without ``fine=True``.
kinetics_adapter (str, optional): The statmech software to use for kinetic rate coefficient calculations.
freq_scale_factor (float, optional): The harmonic frequencies scaling factor.
trsh_ess_jobs (bool, optional): Whether to attempt troubleshooting failed ESS jobs. Default is ``True``.
ts_adapters (list, optional): Entries represent different TS adapters.
report_e_elect (bool, optional): Whether to report electronic energy. Default is ``False``.
skip_nmd (bool, optional): Whether to skip normal mode displacement check. Default is ``False``.
output (dict, optional): Output dictionary with status per job type and final QM file paths for all species.
Attributes:
project (str): The project's name. Used for naming the working directory.
servers (list): A list of servers used for the present project.
species_list (list): Contains input :ref:`ARCSpecies <species>` objects (both species and TSs).
species_dict (dict): Keys are labels, values are :ref:`ARCSpecies <species>` objects.
rxn_list (list): Contains input :ref:`ARCReaction <reaction>` objects.
unique_species_labels (list): A list of species labels (checked for duplicates).
job_dict (dict): A dictionary of all scheduled jobs. Keys are species / TS labels,
values are dictionaries where keys are job names (corresponding to
'running_jobs' if job is running) and values are the Job objects.
running_jobs (dict): A dictionary of currently running jobs (a subset of `job_dict`).
Keys are species/TS label, values are lists of job names (e.g. 'conformer3', 'opt_a123').
server_job_ids (list): A list of relevant job IDs currently running on the server.
output (dict): Output dictionary with status per job type and final QM file paths for all species.
output_multi_spc (dict): Output dictionary with status per job type of multi-species clusters.
ess_settings (dict): A dictionary of available ESS and a corresponding server list.
restart_dict (dict): A restart dictionary parsed from a YAML restart file.
project_directory (str): Folder path for the project: the input file path or ARC/Projects/project-name.
save_restart (bool): Whether to start saving a restart file. ``True`` only after all species are loaded
(otherwise saves a partial file and may cause loss of information).
restart_path (str): Path to the `restart.yml` file to be saved.
max_job_time (float): The maximal allowed job time on the server in hours (can be fractional).
testing (bool): Used for internal ARC testing (generating the object w/o executing it).
rmg_database (RMGDatabase): The RMG database object.
allow_nonisomorphic_2d (bool): Whether to optimize species even if they do not have a 3D conformer that is
isomorphic to the 2D graph representation.
dont_gen_confs (list): A list of species labels for which conformer jobs were loaded from a restart file,
or user-requested. Additional conformer generation should be avoided for them.
memory (float): The total allocated job memory in GB (14 by default).
n_confs (int): The number of lowest force field conformers to consider.
e_confs (float): The energy threshold in kJ/mol above the lowest energy conformer below which
force field conformers are considered.
job_types (dict): A dictionary of job types to execute. Keys are job types, values are boolean.
bath_gas (str): A bath gas. Currently used in OneDMin to calc L-J parameters.
Allowed values are He, Ne, Ar, Kr, H2, N2, O2.
composite_method (str): A composite method to use.
conformer_opt_level (dict): The level of theory to use for conformer comparisons.
conformer_sp_level (dict): The level of theory to use for conformer sp jobs.
opt_level (dict): The level of theory to use for geometry optimizations.
freq_level (dict): The level of theory to use for frequency calculations.
sp_level (dict): The level of theory to use for single point energy calculations.
scan_level (dict): The level of theory to use for torsion scans.
ts_guess_level (dict): The level of theory to use for TS guess comparisons.
irc_level (dict): The level of theory to use for IRC calculations.
orbitals_level (dict): The level of theory to use for calculating MOs (for plotting).
adaptive_levels (dict): A dictionary of levels of theory for ranges of the number of heavy atoms
in the species. Keys are tuples of (min_num_atoms, max_num_atoms),
values are dictionaries with job type tuples as keys and levels of theory
as values. 'inf' is accepted in max_num_atoms
rmg_database (RMGDatabase, optional): The RMG database object.
fine_only (bool): If ``True`` ARC will not run optimization jobs without ``fine=True``.
kinetics_adapter (str): The statmech software to use for kinetic rate coefficient calculations.
freq_scale_factor (float): The harmonic frequencies scaling factor.
trsh_ess_jobs (bool): Whether to attempt troubleshooting failed ESS jobs. Default is ``True``.
ts_adapters (list): Entries represent different TS adapters.
report_e_elect (bool): Whether to report electronic energy.
skip_nmd (bool): Whether to skip normal mode displacement check.
"""
def __init__(self,
project: str,
ess_settings: dict,
species_list: list,
project_directory: str,
composite_method: Optional[Level] = None,
conformer_opt_level: Optional[Level] = None,
conformer_sp_level: Optional[Level] = None,
opt_level: Optional[Level] = None,
freq_level: Optional[Level] = None,
sp_level: Optional[Level] = None,
scan_level: Optional[Level] = None,
ts_guess_level: Optional[Level] = None,
irc_level: Optional[Level] = None,
orbitals_level: Optional[Level] = None,
adaptive_levels: Optional[dict] = None,
rmg_database: Optional = None,
job_types: Optional[dict] = None,
rxn_list: Optional[list] = None,
bath_gas: Optional[str] = None,
restart_dict: Optional[dict] = None,
max_job_time: Optional[float] = None,
allow_nonisomorphic_2d: Optional[bool] = False,
memory: Optional[float] = None,
testing: Optional[bool] = False,
dont_gen_confs: Optional[list] = None,
n_confs: Optional[int] = 10,
e_confs: Optional[float] = 5,
fine_only: Optional[bool] = False,
trsh_ess_jobs: Optional[bool] = True,
kinetics_adapter: str = 'arkane',
freq_scale_factor: float = 1.0,
ts_adapters: List[str] = None,
report_e_elect: Optional[bool] = False,
skip_nmd: Optional[bool] = False,
output: Optional[dict] = None,
) -> None:
self.project = project
self.ess_settings = ess_settings
self.species_list = species_list
self.project_directory = project_directory
self.rmg_database = rmg_database or rmgdb.make_rmg_database_object()
self.restart_dict = restart_dict
self.rxn_list = rxn_list if rxn_list is not None else list()
self.max_job_time = max_job_time or default_job_settings.get('job_time_limit_hrs', 120)
self.job_dict = dict()
self.server_job_ids = list()
self.completed_incore_jobs = list()
self.running_jobs = dict()
self.allow_nonisomorphic_2d = allow_nonisomorphic_2d
self.testing = testing
self.memory = memory or default_job_settings.get('job_total_memory_gb', 14)
self.bath_gas = bath_gas
self.adaptive_levels = adaptive_levels
self.n_confs = n_confs
self.e_confs = e_confs
self.dont_gen_confs = dont_gen_confs or list()
self.job_types = job_types if job_types is not None else default_job_types
self.fine_only = fine_only
self.trsh_ess_jobs = trsh_ess_jobs
self.kinetics_adapter = kinetics_adapter
self.freq_scale_factor = freq_scale_factor
self.ts_adapters = ts_adapters if ts_adapters is not None else default_ts_adapters
self.ts_adapters = [ts_adapter.lower() for ts_adapter in self.ts_adapters]
self.output = output or dict()
self.output_multi_spc = dict()
self.report_e_elect = report_e_elect
self.skip_nmd = skip_nmd
self.species_dict, self.rxn_dict = dict(), dict()
for species in self.species_list:
self.species_dict[species.label] = species
for rxn in self.rxn_list:
self.rxn_dict[rxn.index] = rxn
if self.restart_dict is not None:
self.output = self.restart_dict['output'] if 'output' in self.restart_dict else dict()
self.output_multi_spc = self.restart_dict['output_multi_spc'] if 'output_multi_spc' in self.restart_dict else dict()
if 'running_jobs' in self.restart_dict:
self.restore_running_jobs()
self.initialize_output_dict()
self.restart_path = os.path.join(self.project_directory, 'restart.yml')
self.report_time = time.time() # init time for reporting status every 1 hr
self.servers = list()
self.composite_method = composite_method
self.conformer_opt_level = conformer_opt_level
self.conformer_sp_level = conformer_sp_level
self.ts_guess_level = ts_guess_level
self.opt_level = opt_level
self.freq_level = freq_level
self.sp_level = sp_level
self.scan_level = scan_level
self.irc_level = irc_level
self.orbitals_level = orbitals_level
self.unique_species_labels = list()
self.save_restart = False
if len(self.rxn_list):
rxn_info_path = self.make_reaction_labels_info_file()
logger.info("\nLoading RMG's families...")
rmgdb.load_families_only(self.rmg_database)
for rxn in self.rxn_list:
logger.info('\n\n')
# 1. Update the ARCReaction object and generate an ARCSpecies object for its TS.
rxn.r_species, rxn.p_species = list(), list()
for spc in self.species_list:
if spc.label in rxn.reactants:
rxn.r_species.append(spc)
if spc.label in rxn.products:
rxn.p_species.append(spc)
rxn.rmg_reaction_from_arc_species()
rxn.check_attributes()
rxn.determine_family(rmg_database=self.rmg_database)
family_text = ''
if rxn.family is not None:
family_text = f'identified as belonging to RMG family {rxn.family.label}'
logger.info(f'Considering reaction: {rxn.label}')
if family_text:
logger.info(f'({family_text})')
if rxn.rmg_reaction is not None:
display(rxn.rmg_reaction.copy())
rxn.ts_label = rxn.ts_label if rxn.ts_label is not None else f'TS{rxn.index}'
with open(rxn_info_path, 'a') as f:
f.write(f'{rxn.ts_label}: {rxn.label}')
if family_text:
family_text = f'\n({family_text})'
f.write(str(family_text))
f.write(str('\n\n'))
# 2. Create the TS Species object if needed.
if not any([spc.label == rxn.ts_label for spc in self.species_list]):
ts_species = ARCSpecies(
is_ts=True,
label=rxn.ts_label,
rxn_label=rxn.label,
rxn_index=rxn.index,
multiplicity=rxn.multiplicity,
charge=rxn.charge,
compute_thermo=False,
ts_number=rxn.index,
preserve_param_in_scan=rxn.preserve_param_in_scan,
)
ts_species.number_of_atoms = sum(reactant.number_of_atoms for reactant in rxn.r_species)
self.species_list.append(ts_species)
self.species_dict[ts_species.label] = ts_species
self.initialize_output_dict(ts_species.label)
else:
# The TS species was already loaded from a restart dict or an Arkane YAML file.
ts_species = None
for spc in self.species_list:
if spc.label == rxn.ts_label:
ts_species = spc
if ts_species.rxn_label is None:
ts_species.rxn_label = rxn.label
if ts_species.rxn_index is None:
ts_species.rxn_index = rxn.index
break
if ts_species is None:
raise SchedulerError(f'Could not identify a TS species for {rxn}')
rxn.ts_species = ts_species
# 3. Generate TSGuess objects for all methods, start with the user guesses
for i, user_guess in enumerate(rxn.ts_xyz_guess): # This is a list of user guesses, could be empty.
ts_species.ts_guesses.append(
TSGuess(method=f'user guess {i}',
xyz=user_guess,
rmg_reaction=rxn.rmg_reaction,
index=len(rxn.ts_species.ts_guesses),
success=True,
project_directory=self.project_directory,
)
)
rxn.check_atom_balance()
rxn.check_done_opt_r_n_p()
logger.info('\n\n')
for species in self.species_list:
if not isinstance(species, ARCSpecies):
raise SpeciesError(f"Each species in 'species_list' must be an ARCSpecies object. "
f"Got type {type(species)} for {species.label}")
if species.label in self.unique_species_labels:
raise SpeciesError(f"Each species in 'species_list' has to have a unique label. "
f"Label of species {species.label} is not unique.")
if species.mol is None and not species.is_ts:
# we'll attempt to infer .mol for a TS after we attain xyz for it
# for a non-TS, this attribute should already be set by this point
self.output[species.label]['errors'] = 'Could not infer a 2D graph (a .mol species attribute); '
if species.multi_species is None:
self.unique_species_labels.append(species.label)
elif species.multi_species not in self.unique_species_labels:
self.unique_species_labels.append(species.multi_species)
if self._does_output_dict_contain_info() and species.label in list(self.output.keys()):
self.output[species.label]['restart'] += f'Restarted ARC at {datetime.datetime.now()}; '
if species.label not in self.job_dict:
self.job_dict[species.label] = dict()
if species.yml_path is None:
if self.job_types['rotors'] and not species.number_of_rotors and species.rotors_dict is not None:
# if species.rotors_dict is None, it means the species is marked to not spawn rotor scans
species.determine_rotors()
if not self.job_types['opt'] and species.final_xyz is not None:
# opt wasn't asked for, and it's not needed, declare it as converged
self.output[species.label]['job_types']['opt'] = True
if not self.job_types['conf_opt'] and len(species.conformers) == 1:
# conformers opt weren't asked for, assign initial_xyz
species.initial_xyz = species.conformers[0]
if species.label not in self.running_jobs:
self.running_jobs[species.label if not species.multi_species else species.multi_species] = list()
if self.output[species.label]['convergence']:
continue
if species.is_monoatomic():
if not self.output[species.label]['job_types']['sp'] \
and not self.output[species.label]['job_types']['composite'] \
and 'sp' not in list(self.job_dict[species.label].keys()) \
and 'composite' not in list(self.job_dict[species.label].keys()):
# No need to run opt/freq jobs for a monoatomic species, only run sp (or composite if relevant)
if self.composite_method:
self.run_composite_job(species.label)
else:
self.run_sp_job(label=species.label)
if self.job_types['onedmin']:
self.run_onedmin_job(species.label)
elif species.get_xyz(generate=False) and not self.job_types['conf_opt'] and not self.job_types['opt'] \
and species.irc_label is None:
if self.job_types['freq']:
self.run_freq_job(species.label)
if self.job_types['sp']:
self.run_sp_job(species.label)
if self.job_types['rotors']:
self.run_sp_job(species.label, level=self.scan_level)
if not self.job_types['opt']:
self.run_scan_jobs(species.label)
elif ((species.initial_xyz is not None or species.final_xyz is not None)
or species.is_ts and species.rxn_label is None) and not self.testing:
# For restarting purposes: check before running jobs whether they were already terminated
# (check self.output) or whether they are "currently running" (check self.job_dict)
# This section takes care of restarting a Species (including a TS), but does not
# deal with conformers nor with ts_guesses
if self.composite_method:
# composite-related restart
if not self.output[species.label]['job_types']['composite'] \
and 'composite' not in list(self.job_dict[species.label].keys())\
and not os.path.isfile(self.output[species.label]['paths']['geo']):
# doing composite; composite hasn't finished and is not running; spawn composite
self.run_composite_job(species.label)
elif 'composite' not in list(self.job_dict[species.label].keys()) \
and species.irc_label is None:
# composite is done; do other jobs
if not self.output[species.label]['job_types']['freq'] \
and 'freq' not in list(self.job_dict[species.label].keys()) \
and (species.is_ts or species.number_of_atoms > 1):
self.run_freq_job(species.label)
if self.job_types['rotors']:
self.run_scan_jobs(species.label)
else:
# non-composite-related restart
if ('opt' not in self.job_dict[species.label].keys() and not self.job_types['fine']) or \
(self.job_types['fine'] and 'opt' not in list(self.job_dict[species.label].keys())
and 'fine' not in list(self.job_dict[species.label].keys())):
# opt/fine isn't running
if not self.output[species.label]['paths']['geo'] and self.job_types['opt']:
# opt/fine hasn't finished (and isn't running), so run it
self.run_opt_job(species.label, fine=self.fine_only)
if self.output[species.label]['paths']['geo'] and 'sp' not in self.job_dict[species.label].keys() \
and not self.output[species.label]['paths']['sp'] and self.job_types['sp'] \
and species.irc_label is None:
self.run_sp_job(species.label)
if self.output[species.label]['paths']['geo'] and 'freq' not in self.job_dict[species.label].keys() \
and not self.output[species.label]['paths']['freq'] and self.job_types['freq'] \
and (species.is_ts or species.number_of_atoms > 1) and species.irc_label is None:
self.run_freq_job(species.label)
if self.output[species.label]['paths']['geo'] and self.job_types['rotors'] and \
any(spc.rotors_dict is not None and
any(rotor_dict['success'] is False and not rotor_dict['invalidation_reason']
for rotor_dict in spc.rotors_dict.values())
for spc in self.species_list):
# Additional restart-related checks are performed within run_scan_jobs().
self.run_scan_jobs(species.label)
else:
# Species is loaded from an Arkane YAML file (no need to execute any job)
self.output[species.label]['convergence'] = True
self.output[species.label]['info'] += 'Loaded from an Arkane YAML file; '
if species.is_ts:
# This is a TS loaded from a YAML file
species.ts_conf_spawned = True
self.save_restart = True
self.timer = True
if not self.testing:
self.schedule_jobs()
[docs] def schedule_jobs(self):
"""
The main job scheduling block
"""
for species in self.species_dict.values():
if species.initial_xyz is None and species.final_xyz is None and species.conformers \
and any([e is not None for e in species.conformer_energies]):
# The species has no xyz, but has conformers and at least one of the conformers has energy.
self.determine_most_stable_conformer(species.label)
if species.initial_xyz is not None:
if self.composite_method:
self.run_composite_job(species.label)
else:
self.run_opt_job(species.label, fine=self.fine_only)
self.run_conformer_jobs()
self.spawn_ts_jobs() # If all reactants/products are already known (Arkane yml or restart), spawn TS searches.
while self.running_jobs != {}:
self.timer = True
for label in self.unique_species_labels:
if label in self.output and self.output[label]['convergence'] is False:
# Skip unconverged species.
if label in self.running_jobs:
del self.running_jobs[label]
continue
# Look for completed jobs and decide what jobs to run next.
self.get_server_job_ids() # updates ``self.server_job_ids``
self.get_completed_incore_jobs() # updates ``self.completed_incore_jobs``
if label not in self.running_jobs.keys():
continue
job_list = self.running_jobs[label]
for job_name in job_list:
if 'conf' in job_name:
i = get_i_from_job_name(job_name)
job = self.job_dict[label]['conf_opt'][i] if 'conf_opt' in job_name \
else self.job_dict[label]['conf_sp'][i]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
# this is a completed conformer job
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination:
troubleshooting_conformer = self.parse_conformer(job=job, label=label, i=i)
if 'conf_opt' in job_name and self.job_types['conf_sp'] and not troubleshooting_conformer:
self.run_sp_job(label=label,
level=self.conformer_sp_level,
conformer=i)
if troubleshooting_conformer:
break
# Just terminated a conformer job.
# Are there additional conformer jobs currently running for this species?
for spec_jobs in job_list:
if ('conf_opt' in spec_jobs or 'conf_sp' in spec_jobs) and spec_jobs != job_name:
break
else:
# All conformer jobs terminated.
# Check isomorphism and run opt on most stable conformer geometry.
logger.info(f'\nConformer jobs for {label} successfully terminated.\n')
if self.species_dict[label].is_ts:
self.determine_most_likely_ts_conformer(label)
else:
self.determine_most_stable_conformer(label, sp_flag=True if self.job_types['conf_sp'] else False) # also checks isomorphism
if self.species_dict[label].initial_xyz is not None:
# if initial_xyz is None, then we're probably troubleshooting conformers, don't opt
if not self.composite_method:
self.run_opt_job(label, fine=self.fine_only)
else:
self.run_composite_job(label)
self.timer = False
break
if 'tsg' in job_name:
job = self.job_dict[label]['tsg'][get_i_from_job_name(job_name)]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
# This is a successfully completed tsg job. It may have resulted in several TSGuesses.
self.end_job(job=job, label=label, job_name=job_name)
if job.local_path_to_output_file.endswith('.yml'):
for rxn in job.reactions:
rxn.ts_species.process_completed_tsg_queue_jobs(yml_path=job.local_path_to_output_file)
# Just terminated a tsg job.
# Are there additional tsg jobs currently running for this species?
for spec_jobs in job_list:
if 'tsg' in spec_jobs and spec_jobs != job_name:
break
else:
# All tsg jobs terminated. Spawn confs.
logger.info(f'\nTS guess jobs for {label} successfully terminated.\n')
self.run_conformer_jobs(labels=[label])
self.timer = False
break
elif 'opt' in job_name and 'conf_opt' not in job_name:
# val is 'opt1', 'opt2', etc., or 'optfreq1', optfreq2', etc.
job = self.job_dict[label]['opt'][job_name]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination:
multi_species = any(spc.multi_species == label for spc in self.species_list)
if multi_species:
self.multi_species_path_dict = plotter.make_multi_species_output_file(species_list=self.species_list,
label=label,
path=job.local_path_to_xyz or job.local_path_to_output_file)
success = self.parse_opt_geo(label=label, job=job)
if success:
if not self.job_types['sp']:
self.parse_opt_e_elect(label=label, job=job)
self.spawn_post_opt_jobs(label=label, job_name=job_name)
if multi_species:
plotter.delete_multi_species_output_file(species_list=self.species_list,
label=label,
multi_species_path_dict=self.multi_species_path_dict
)
self.timer = False
break
elif 'freq' in job_name:
# this is NOT an 'optfreq' job
job = self.job_dict[label]['freq'][job_name]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination:
self.check_freq_job(label=label, job=job)
self.timer = False
break
elif 'sp' in job_name and 'conf_sp' not in job_name:
job = self.job_dict[label]['sp'][job_name]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination:
self.check_sp_job(label=label, job=job)
self.timer = False
break
elif 'composite' in job_name:
job = self.job_dict[label]['composite'][job_name]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination:
success = self.parse_composite_geo(label=label, job=job)
if success:
self.spawn_post_opt_jobs(label=label, job_name=job_name)
self.timer = False
break
elif 'directed_scan' in job_name:
job = self.job_dict[label]['directed_scan'][job_name]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination:
self.check_directed_scan_job(label=label, job=job)
if 'cont' in job.directed_scan_type and job.job_status[1]['status'] == 'done':
# This is a continuous restricted optimization, spawn the next job in the scan.
xyz = parser.parse_xyz_from_file(job.local_path_to_output_file) \
if not hasattr(job, 'opt_xyz') else job.opt_xyz
self.spawn_directed_scan_jobs(label=label, rotor_index=job.rotor_index, xyz=xyz)
if 'brute_force' in job.directed_scan_type:
# Just terminated a brute_force directed scan job.
# Are there additional jobs of the same type currently running for this species?
self.species_dict[label].rotors_dict[job.rotor_index]['number_of_running_jobs'] -= 1
if not self.species_dict[label].rotors_dict[job.rotor_index]['number_of_running_jobs']:
# All brute force scan jobs for these pivots terminated.
logger.info(f'\nAll brute force directed scan jobs for species {label} between '
f'pivots {job.pivots} successfully terminated.\n')
self.process_directed_scans(label, pivots=job.pivots)
shutil.rmtree(job.local_path, ignore_errors=True)
self.timer = False
break
elif 'scan' in job_name and 'directed' not in job_name:
job = self.job_dict[label]['scan'][job_name]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination \
and (job.directed_scan_type is None or job.directed_scan_type == 'ess'):
self.check_scan_job(label=label, job=job)
elif successful_server_termination and job.job_status[1]['status'] == 'errored':
self.troubleshoot_ess(label=label, job=job, level_of_theory=job.level)
self.timer = False
break
elif 'irc' in job_name:
job = self.job_dict[label]['irc'][job_name]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination:
self.spawn_post_irc_jobs(label=label, job=job)
self.timer = False
break
elif 'orbitals' in job_name:
job = self.job_dict[label]['orbitals'][job_name]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination:
# copy the orbitals file to the species / TS output folder
folder_name = 'rxns' if self.species_dict[label].is_ts else 'Species'
orbitals_path = os.path.join(self.project_directory, 'output', folder_name, label,
'geometry', 'orbitals.fchk')
if os.path.isfile(job.local_path_to_orbitals_file):
try:
shutil.copyfile(job.local_path_to_orbitals_file, orbitals_path)
except shutil.SameFileError:
pass
self.timer = False
break
elif 'onedmin' in job_name:
job = self.job_dict[label]['onedmin'][job_name]
if not (job.job_id in self.server_job_ids and job.job_id not in self.completed_incore_jobs):
successful_server_termination = self.end_job(job=job, label=label, job_name=job_name)
if successful_server_termination:
# Copy the lennard_jones file to the species output folder (TS's don't have L-J data).
lj_output_path = os.path.join(self.project_directory, 'output', 'Species', label,
'lennard_jones.dat')
if os.path.isfile(job.local_path_to_lj_file):
try:
shutil.copyfile(job.local_path_to_lj_file, lj_output_path)
except shutil.SameFileError:
pass
self.output[label]['job_types']['onedmin'] = True
self.species_dict[label].set_transport_data(
lj_path=os.path.join(self.project_directory, 'output', 'Species', label,
'lennard_jones.dat'),
opt_path=self.output[label]['paths']['geo'], bath_gas=job.bath_gas,
opt_level=self.opt_level)
self.timer = False
break
if not len(job_list):
self.check_all_done(label)
if not self.running_jobs[label]:
# Delete the label only if it represents an empty entry.
del self.running_jobs[label]
if self.timer and len(job_list):
time.sleep(30) # wait 30 sec before bugging the servers again.
t = time.time() - self.report_time
if t > 3600 and self.running_jobs:
self.report_time = time.time()
logger.info(f'Currently running jobs:\n{pprint.pformat(self.running_jobs)}')
# Generate a TS report:
self.generate_final_ts_guess_report()
[docs] def run_job(self,
job_type: str,
conformer: Optional[int] = None,
cpu_cores: Optional[int] = None,
dihedral_increment: Optional[float] = None,
dihedrals: Optional[list] = None,
directed_scan_type: Optional[str] = None,
ess_trsh_methods: Optional[list] = None,
fine: Optional[bool] = False,
irc_direction: Optional[str] = None,
job_adapter: Optional[str] = None,
label: Optional[Union[str, List[str]]] = None,
level_of_theory: Optional[Union[Level, dict, str]] = None,
memory: Optional[int] = None,
max_job_time: Optional[int] = None,
rotor_index: Optional[int] = None,
reactions: Optional[List['ARCReaction']] = None,
queue: Optional[str] = None,
attempted_queues: Optional[list] = None,
scan_trsh: Optional[str] = '',
shift: Optional[str] = '',
trsh: Optional[Union[str, dict, list]] = None,
torsions: Optional[List[List[int]]] = None,
times_rerun: int = 0,
tsg: Optional[int] = None,
xyz: Optional[Union[dict, List[dict]]]= None,
):
"""
A helper function for running (all) jobs.
Args:
job_type (str): The type of job to run.
conformer (int, optional): Conformer number if optimizing conformers.
cpu_cores (int, optional): The total number of cpu cores requested for a job.
dihedral_increment (float, optional): The degrees increment to use when scanning dihedrals of TS guesses.
dihedrals (list, optional): The dihedral angles of a directed scan job corresponding to ``torsions``.
directed_scan_type (str, optional): The type of the directed scan.
ess_trsh_methods (list, optional): A list of troubleshooting methods already tried out for ESS convergence.
fine (bool, optional): Whether to run an optimization job with a fine grid. `True` to use fine.
irc_direction (str, optional): The direction to run the IRC computation.
job_adapter (str, optional): An ESS software to use.
label (Union[str, List[str]], optional): The species label, or a list of labels in case of multispecies.
level_of_theory (Level, optional): The level of theory to use.
memory (int, optional): The total job allocated memory in GB.
max_job_time (int, optional): The maximal allowed job time on the server in hours.
rotor_index (int, optional): The 0-indexed rotor number (key) in the species.rotors_dict dictionary.
reactions (List[ARCReaction], optional): Entries are ARCReaction instances, used for TS search methods.
scan_trsh (str, optional): A troubleshooting method for rotor scans.
shift (str, optional): A string representation alpha- and beta-spin orbitals shifts (molpro only).
times_rerun (int, optional): Number of times this job was re-run with the same arguments (no trsh methods).
torsions (List[List[int]], optional): The 0-indexed atom indices of the torsion(s).
trsh (str, optional): A troubleshooting keyword to be used in input files.
tsg (int, optional): TSGuess number if optimizing TS guesses.
xyz (Union[dict, List[dict]], optional): The 3D coordinates for the species.
"""
max_job_time = max_job_time or self.max_job_time # if it's None, set to default
ess_trsh_methods = ess_trsh_methods if ess_trsh_methods is not None else list()
species = None
if isinstance(label, str) and label in self.output:
species = self.species_dict[label]
elif isinstance(label, list):
species = [spc for spc in self.species_list if spc.label in label]
run_multi_species = all([spc.multi_species is not None for spc in species]) if isinstance(species, list) else False
memory = memory if memory is not None else self.memory
checkfile = self.species_dict[label].checkfile if isinstance(label, str) else None
if torsions is None and rotor_index is not None:
torsions = species.rotors_dict[rotor_index]['torsion']
torsions = [torsions] if not isinstance(torsions[0], list) else torsions
if self.adaptive_levels is not None and label is not None:
level_of_theory = self.determine_adaptive_level(original_level_of_theory=level_of_theory, job_type=job_type,
heavy_atoms=self.species_dict[label].number_of_heavy_atoms)
job_adapter = job_adapter.lower() if job_adapter is not None else \
self.deduce_job_adapter(level=Level(repr=level_of_theory), job_type=job_type)
args = {'keyword': {}, 'block': {}}
if trsh:
if isinstance(trsh, (str, list)):
args['trsh'] = {'trsh': trsh}
elif isinstance(trsh, dict) and 'trsh' in args:
for key, value in trsh.items():
if isinstance(args['trsh'][key], list) and isinstance(value, list) and key in args['trsh']:
args['trsh'][key].extend(value)
else:
args['trsh'][key] = value
else:
args['trsh'] = trsh
if shift:
args['shift'] = shift
if scan_trsh:
args['keyword']['scan_trsh'] = scan_trsh
if isinstance(level_of_theory, Level) and level_of_theory.args is not None:
args.update(level_of_theory.args)
job = job_factory(job_adapter=job_adapter,
project=self.project,
project_directory=self.project_directory,
job_type=job_type,
level=Level(repr=level_of_theory) if level_of_theory is not None else None,
args=args,
bath_gas=self.bath_gas,
checkfile=checkfile,
conformer=conformer,
constraints=None,
cpu_cores=cpu_cores,
dihedral_increment=dihedral_increment,
dihedrals=dihedrals,
directed_scan_type=directed_scan_type,
ess_settings=self.ess_settings,
ess_trsh_methods=ess_trsh_methods,
execution_type='incore' if job_adapter in default_incore_adapters else 'queue',
fine=fine,
irc_direction=irc_direction,
job_memory_gb=memory,
max_job_time=max_job_time,
reactions=[reactions] if reactions is not None and not isinstance(reactions, list) else reactions,
rotor_index=rotor_index,
server_nodes=None,
queue = queue if queue is not None else None,
attempted_queues=attempted_queues if attempted_queues is not None else list(),
species=[species] if species is not None and not isinstance(species, list) else species,
times_rerun=times_rerun,
torsions=torsions,
tsg=tsg,
xyz=xyz,
run_multi_species=run_multi_species,
)
label = label or reactions[0].ts_species.label
label = species[0].multi_species if run_multi_species else label
if label not in self.job_dict.keys():
self.job_dict[label] = dict()
if conformer is None and tsg is None:
# this is NOT a conformer DFT job nor a TS guess job
self.running_jobs[label] = list() if label not in self.running_jobs else self.running_jobs[label]
self.running_jobs[label].append(job.job_name) # mark as a running job
if job_type not in self.job_dict[label].keys():
# Jobs of this type haven't been spawned for label
self.job_dict[label][job_type] = dict()
self.job_dict[label][job_type][job.job_name] = job
elif conformer is not None:
# Running a conformer DFT job. Append differently to job_dict.
self.running_jobs[label] = list() if label not in self.running_jobs else self.running_jobs[label]
self.running_jobs[label].append(f'{job_type}_{conformer}') # mark as a running job
if 'conf_opt' not in self.job_dict[label]:
self.job_dict[label]['conf_opt'] = dict()
if 'conf_sp' not in self.job_dict[label] and job_type == 'conf_sp':
self.job_dict[label]['conf_sp'] = dict()
self.job_dict[label][job_type][conformer] = job # save job object
elif tsg is not None:
# Running a TS guess job. Append differently to job_dict.
self.running_jobs[label] = list() if label not in self.running_jobs else self.running_jobs[label]
self.running_jobs[label].append(f'tsg{tsg}') # mark as a running job
if 'tsg' not in self.job_dict[label]:
self.job_dict[label]['tsg'] = dict()
self.job_dict[label]['tsg'][tsg] = job # save job object
if job.server is not None and job.server not in self.servers:
self.servers.append(job.server)
self.check_max_simultaneous_jobs_limit(job.server)
job.execute()
self.save_restart_dict()
[docs] def deduce_job_adapter(self, level: Level, job_type: str) -> str:
"""
Deduce the job adapter (the software) to be used for jobs other than TS searches.
Args:
level (Level): The level of theory that will be used for the job.
job_type (str): The job's type.
Returns: str
The deduced job adapter.
"""
level.deduce_software(job_type=job_type)
if level.software is not None:
job_adapter = level.software
else:
logger.error(f'Could not determine software for job type {job_type}')
logger.error(f'Using level_of_theory: {level}')
available_ess = list(self.ess_settings.keys())
if 'gaussian' in available_ess:
logger.error('Setting it to Gaussian')
level.software = 'gaussian'
elif 'qchem' in available_ess:
logger.error('Setting it to QChem')
level.software = 'qchem'
elif 'orca' in available_ess:
logger.error('Setting it to Orca')
level.software = 'orca'
elif 'molpro' in available_ess:
logger.error('Setting it to Molpro')
level.software = 'molpro'
elif 'terachem' in available_ess:
logger.error('Setting it to TeraChem')
level.software = 'terachem'
job_adapter = level.software
return job_adapter.lower()
[docs] def end_job(self, job: 'JobAdapter',
label: str,
job_name: str,
) -> bool:
"""
A helper function for checking job status, saving in csv file, and downloading output files if needed.
Args:
job (JobAdapter): The job object.
label (str): The species label.
job_name (str): The job name from the running_jobs dict.
Returns:
bool: ``True`` if job terminated successfully on the server, ``False`` otherwise.
"""
if job.job_status[0] != 'done' or job.job_status[1]['status'] != 'done':
try:
job.determine_job_status() # Also downloads the output file.
except IOError:
if job.job_type not in ['orbitals']:
logger.warning(f'Tried to determine status of job {job.job_name}, '
f'but it seems like the job never ran. Re-running job.')
self._run_a_job(job=job, label=label)
if job_name in self.running_jobs[label]:
self.running_jobs[label].pop(self.running_jobs[label].index(job_name))
if job.job_status[1]['status'] == 'errored' and job.job_status[1]['keywords'] == ['memory']:
original_mem = job.job_memory_gb
if 'insufficient job memory' in job.job_status[1]['error'].lower():
job.job_memory_gb *= 3
logger.warning(f'Job {job.job_name} errored because of insufficient memory. '
f'Was {original_mem} GB, rerunning job with {job.job_memory_gb} GB.')
self._run_a_job(job=job, label=label)
elif 'memory requested is too high' in job.job_status[1]['error'].lower():
used_mem = None
if 'used only' in job.job_status[1]['error']:
used_mem = int(job.job_status[1]['error'][-2])
logger.warning(f'Job {job.job_name} errored because the requested memory is too high. '
f'Was {original_mem} GB, rerunning job with {job.job_memory_gb} GB.')
job.job_memory_gb = used_mem * 4.5 if used_mem is not None else job.job_memory_gb * 0.5
self._run_a_job(job=job, label=label)
if job.job_status[1]['status'] == 'errored' and job.job_status[1]['keywords'] == ['ServerTimeLimit']:
logger.warning(f'Job {job.job_name} errored because of a server time limit. '
f'Rerunning job with {job.max_job_time * 2} hours.')
job.max_job_time *= 2
run_again = job.troubleshoot_queue()
if run_again:
self._run_a_job(job=job, label=label)
if job_name in self.running_jobs[label]:
self.running_jobs[label].pop(self.running_jobs[label].index(job_name))
return False
if not os.path.isfile(job.local_path_to_output_file) and not job.execution_type == 'incore':
job.rename_output_file()
if not os.path.isfile(job.local_path_to_output_file) and not job.execution_type == 'incore':
if 'restart_due_to_file_not_found' in job.ess_trsh_methods:
job.job_status[0] = 'errored'
job.job_status[1]['status'] = 'errored'
logger.warning(f'Job {job.job_name} errored because for the second time ARC did not find the output '
f'file path {job.local_path_to_output_file}.')
elif job.job_type not in ['orbitals']:
job.ess_trsh_methods.append('restart_due_to_file_not_found')
logger.warning(f'Did not find the output file of job {job.job_name} with path '
f'{job.local_path_to_output_file}. Maybe the job never ran. Re-running job.')
self._run_a_job(job=job, label=label)
if job_name in self.running_jobs[label]:
self.running_jobs[label].pop(self.running_jobs[label].index(job_name))
return False
if job.job_status[0] != 'running' and job.job_status[1]['status'] != 'running':
if job_name in self.running_jobs[label]:
self.running_jobs[label].pop(self.running_jobs[label].index(job_name))
self.timer = False
job.write_completed_job_to_csv_file()
logger.info(f' Ending job {job_name} for {label} (run time: {job.run_time})')
if job.job_status[0] != 'done':
return False
if job.job_adapter in ['gaussian', 'terachem'] and os.path.isfile(os.path.join(job.local_path, 'check.chk')) \
and job.job_type in ['opt', 'optfreq', 'composite']:
check_path = os.path.join(job.local_path, 'check.chk')
if os.path.isfile(check_path):
if 'directed_scan' in job.job_name and 'cont' in job.directed_scan_type:
folder_name = 'rxns' if job.is_ts else 'Species'
r_path = os.path.join(self.project_directory, 'output', folder_name, job.species_label, 'rotors')
if not os.path.isdir(r_path):
os.makedirs(r_path)
shutil.copyfile(src=check_path, dst=os.path.join(r_path, 'directed_rotor_check.chk'))
self.species_dict[label].checkfile = os.path.join(r_path, 'directed_rotor_check.chk')
elif label in self.output:
self.species_dict[label].checkfile = check_path
if job.job_type == 'scan' or job.directed_scan_type == 'ess':
for rotors_dict in self.species_dict[label].rotors_dict.values():
if rotors_dict['pivots'] in [job.pivots, job.pivots[0]]:
rotors_dict['scan_path'] = job.local_path_to_output_file
self.save_restart_dict()
return True
def _run_a_job(self,
job: 'JobAdapter',
label: str,
rerun: bool = False,
):
"""
A helper function to run an ARC job (used internally).
Args:
job (JobAdapter): The job object.
label (str): The species label.
rerun (bool optional): Whether this job is being re-run.
"""
self.run_job(job_type=job.job_type,
conformer=job.conformer,
cpu_cores=job.cpu_cores,
dihedrals=job.dihedrals,
directed_scan_type=job.directed_scan_type,
ess_trsh_methods=job.ess_trsh_methods,
fine=job.fine,
irc_direction=job.irc_direction,
job_adapter=job.job_adapter,
label=label,
level_of_theory=job.level,
memory=job.job_memory_gb,
max_job_time=job.max_job_time,
rotor_index=job.rotor_index,
reactions=job.reactions,
queue=job.queue if job.queue is not None else None,
attempted_queues=job.attempted_queues if job.attempted_queues is not None else list(),
trsh=job.args['trsh'] if 'trsh' in job.args else {},
torsions=job.torsions,
times_rerun=job.times_rerun + int(rerun),
tsg=job.tsg,
xyz=job.xyz,
)
[docs] def run_opt_job(self, label: str, fine: bool = False):
"""
Spawn a geometry optimization job. The initial guess is taken from the `initial_xyz` attribute.
Args:
label (str): The species label.
fine (bool): Whether a fine grid should be used during optimization.
"""
if 'opt' not in self.job_dict[label].keys(): # Check whether opt jobs have been spawned yet.
# we're spawning the first opt job for this species
self.job_dict[label]['opt'] = dict()
self.species_dict[label].initial_xyz = self.species_dict[label].initial_xyz \
or self.species_dict[label].get_xyz(generate=False)
if self.species_dict[label].initial_xyz is None:
raise SpeciesError(f'Cannot execute opt job for {label} without xyz (got None for Species.initial_xyz)')
label_single_spc = None
key = None
if self.species_dict[label].multi_species:
key = 'fine' if fine else 'opt'
if self.output_multi_spc[self.species_dict[label].multi_species].get(key, False):
return
label_single_spc = label
label = [species.label for species in self.species_list
if species.multi_species == self.species_dict[label].multi_species]
self.run_job(label=label,
xyz=self.species_dict[label].initial_xyz if isinstance(label, str) else None,
level_of_theory=self.opt_level,
job_type='opt',
fine=fine)
if label_single_spc is not None and key is not None:
self.output_multi_spc[self.species_dict[label_single_spc].multi_species][key] = True
[docs] def run_composite_job(self, label: str):
"""
Spawn a composite job (e.g., CBS-QB3) using 'final_xyz' for species ot TS 'label'.
Args:
label (str): The species label.
"""
if not self.composite_method:
raise SchedulerError(f'Cannot run {label} as a composite method without specifying a method.')
if 'composite' not in self.job_dict[label].keys(): # Check whether composite jobs have been spawned yet.
# We're spawning the first composite job for this species.
self.job_dict[label]['composite'] = dict()
xyz = self.species_dict[label].final_xyz if self.species_dict[label].final_xyz is not None \
else self.species_dict[label].initial_xyz
if self.species_dict[label].multi_species:
if self.output_multi_spc[self.species_dict[label].multi_species].get('composite', False):
return
self.output_multi_spc[self.species_dict[label].multi_species]['composite'] = True
label = [species.label for species in self.species_list
if species.multi_species == self.species_dict[label].multi_species]
self.run_job(label=label, xyz=xyz, level_of_theory=self.composite_method, job_type='composite',
fine=self.job_types['fine'])
[docs] def run_freq_job(self, label):
"""
Spawn a freq job using 'final_xyz' for species ot TS 'label'.
If this was originally a composite job, run an appropriate separate freq job outputting the Hessian.
Args:
label (str): The species label.
"""
if 'freq' not in self.job_dict[label].keys(): # Check whether freq jobs have been spawned yet.
# We're spawning the first freq job for this species.
self.job_dict[label]['freq'] = dict()
if self.species_dict[label].multi_species:
if self.output_multi_spc[self.species_dict[label].multi_species].get('freq', False):
return
self.output_multi_spc[self.species_dict[label].multi_species]['freq'] = True
label = [species.label for species in self.species_list
if species.multi_species == self.species_dict[label].multi_species]
if self.job_types['freq']:
self.run_job(label=label, xyz=self.species_dict[label].get_xyz(generate=False),
level_of_theory=self.freq_level, job_type='freq')
[docs] def run_sp_job(self,
label: str,
level: Optional[Level] = None,
conformer: Optional[int] = None,
):
"""
Spawn a single point job using 'final_xyz' for species ot TS 'label'.
If the method is MRCI, first spawn a simple CCSD job, and use orbital determination to run the MRCI job.
Args:
label (str): The species label.
level (Level): An alternative level of theory to run at. If ``None``, self.sp_level will be used.
conformer (int): The conformer number.
"""
level = level or self.sp_level
if self.job_types['conf_sp'] and conformer is not None and self.conformer_sp_level != self.conformer_opt_level:
self.run_job(label=label,
xyz=self.species_dict[label].conformers[conformer],
level_of_theory=self.conformer_sp_level,
job_type='conf_sp',
conformer=conformer)
return
# determine_occ(xyz=self.xyz, charge=self.charge)
if level == self.opt_level and not self.composite_method \
and not (level.software == 'xtb' and self.species_dict[label].is_ts) \
and 'paths' in self.output[label] and 'geo' in self.output[label]['paths'] \
and self.output[label]['paths']['geo']:
logger.info(f'Not running an sp job for {label} at {level} since the optimization was done at the '
f'same level of theory. Using the optimization output to parse the sp energy.')
recent_opt_job_name, recent_opt_job = 'opt_a0', None
if 'opt' in self.job_dict[label].keys():
for opt_job_name, opt_job in self.job_dict[label]['opt'].items():
if int(opt_job_name.split('_a')[-1]) > int(recent_opt_job_name.split('_a')[-1]):
recent_opt_job_name, recent_opt_job = opt_job_name, opt_job
if recent_opt_job is not None:
recent_opt_job.rename_output_file()
self.post_sp_actions(label=label,
sp_path=os.path.join(recent_opt_job.local_path_to_output_file),
level=level,
)
# If opt is not in the job dictionary, the likely explanation is this job has been restarted
elif 'geo' in self.output[label]['paths']: # Then just use this path directly
self.post_sp_actions(label=label,
sp_path=self.output[label]['paths']['geo'],
level=level,
)
else:
raise RuntimeError(f'Unable to set the path for the sp job for species {label}')
return
if 'sp' not in self.job_dict[label].keys(): # Check whether single-point energy jobs have been spawned yet.
# We're spawning the first sp job for this species.
self.job_dict[label]['sp'] = dict()
if self.composite_method:
raise SchedulerError(f'run_sp_job() was called for {label} which has a composite method level of theory')
if 'mrci' in level.method:
if self.job_dict[label]['sp']:
# Parse orbital information from the CCSD job, then run MRCI
job0 = None
job_name_0 = 0
for job_name, job in self.job_dict[label]['sp'].items():
if int(job_name.split('_a')[-1]) > job_name_0:
job_name_0 = int(job_name.split('_a')[-1])
job0 = job
with open(job0.local_path_to_output_file, 'r') as f:
lines = f.readlines()
core = val = 0
for line in lines:
if 'NUMBER OF CORE ORBITALS' in line:
core = int(line.split()[4])
elif 'NUMBER OF VALENCE ORBITALS' in line:
val = int(line.split()[4])
if val * core:
break
else:
raise SchedulerError(f'Could not determine number of core and valence orbitals from CCSD '
f'sp calculation for {label}')
self.species_dict[label].occ = val + core # the occupied orbitals are the core and valence orbitals
self.run_job(label=label,
xyz=self.species_dict[label].get_xyz(generate=False),
level_of_theory='ccsd/vdz',
job_type='sp')
else:
# MRCI was requested but no sp job ran for this species, run CCSD first
logger.info(f'running a CCSD job for {label} before MRCI')
self.run_job(label=label,
xyz=self.species_dict[label].get_xyz(generate=False),
level_of_theory='ccsd/vdz',
job_type='sp')
if self.job_types['sp']:
if self.species_dict[label].multi_species:
if self.output_multi_spc[self.species_dict[label].multi_species].get('sp', False):
return
self.output_multi_spc[self.species_dict[label].multi_species]['sp'] = True
label = [species.label for species in self.species_list
if species.multi_species == self.species_dict[label].multi_species]
self.run_job(label=label,
xyz=self.species_dict[label].get_xyz(generate=False),
level_of_theory=level,
job_type='sp',
)
[docs] def run_scan_jobs(self, label: str):
"""
Spawn rotor scan jobs using 'final_xyz' for species (or TS).
Args:
label (str): The species label.
"""
if self.job_types['rotors'] and isinstance(self.species_dict[label].rotors_dict, dict):
for i, rotor in self.species_dict[label].rotors_dict.items():
if rotor['scan_path'] and os.path.isfile(rotor['scan_path']):
continue
# Since this function is relevant for in multiple cases, all cases are listed for debugging
# [have not started] success = None, and scan_path = ''
# [first time calculating] success = None, and scan_path = ''
# [converged, good] success = True, and scan_path is file
# [converged, invalidated] success = False, and scan_path is file
# [previous converged, troubleshooting] success = None, and scan_path (previous scan) is file
# [previous converged, lower conformer] success = None, and scan_path (previous scan) is file
# [not a torsion] success = False, and scan_path = ''
if rotor['success'] is not None:
if rotor['scan_path'] and not os.path.isfile(rotor['scan_path']):
# For some reason the output file does not exist.
rotor['success'] = None
else:
continue
torsions = rotor['torsion']
if not isinstance(torsions[0], list):
# Check that a 1D rotor is not linear.
coords = xyz_to_coords_list(self.species_dict[label].get_xyz())
v1 = [c1 - c2 for c1, c2 in zip(coords[torsions[0]], coords[torsions[1]])]
v2 = [c2 - c1 for c1, c2 in zip(coords[torsions[1]], coords[torsions[2]])]
v3 = [c1 - c2 for c1, c2 in zip(coords[torsions[2]], coords[torsions[3]])]
angle1, angle2 = get_angle(v1, v2, units='degs'), get_angle(v2, v3, units='degs')
if any([is_angle_linear(angle, tolerance=0.3) for angle in [angle1, angle2]]):
# This is not a torsional mode, invalidate rotor.
rotor['success'] = False
rotor['invalidation_reason'] = \
f'not a torsional mode (angles = {angle1:.2f}, {angle2:.2f} degrees)'
continue
directed_scan_type = rotor['directed_scan_type'] if 'directed_scan_type' in rotor else ''
if directed_scan_type != 'ess':
# This is a directed scan.
# Check that this job isn't already running on the server (from a restarted project).
if 'directed_scan' not in self.job_dict[label].keys():
# We're spawning the first brute force scan jobs for this species.
self.job_dict[label]['directed_scan'] = dict()
# Check that this job isn't already running on the server (from a restarted project).
for directed_scan_job in self.job_dict[label]['directed_scan'].values():
if directed_scan_job.torsions == torsions \
and directed_scan_job.job_name in self.running_jobs[label]:
break
else:
if 'cont' in directed_scan_type:
for directed_pivots, job in self.job_dict[label]['directed_scan'].items():
if directed_pivots == rotor['pivots'] \
and self.job_dict[label]['directed_scan'][directed_pivots]:
# The previous job hasn't finished.
break
else:
self.spawn_directed_scan_jobs(label, rotor_index=i)
else:
self.spawn_directed_scan_jobs(label, rotor_index=i)
else:
# This is a "normal" scan (not directed).
# Check that this job isn't already running on the server (from a restarted project).
if 'scan' not in self.job_dict[label].keys():
# We're spawning the first scan job for this species.
self.job_dict[label]['scan'] = dict()
# Check that this job isn't already running on the server (from a restarted project).
for scan_job in self.job_dict[label]['scan'].values():
if torsions == scan_job.torsions and scan_job.job_name in self.running_jobs[label]:
break
else:
if self.species_dict[label].multi_species:
if self.output_multi_spc[self.species_dict[label].multi_species].get('scan', False):
return
self.output_multi_spc[self.species_dict[label].multi_species]['scan'] = True
label = [species.label for species in self.species_list
if species.multi_species == self.species_dict[label].multi_species]
self.run_job(label=label,
xyz=self.species_dict[label].get_xyz(generate=False),
level_of_theory=self.scan_level,
job_type='scan',
torsions=torsions,
rotor_index=i,
)
[docs] def run_irc_job(self, label, irc_direction='forward'):
"""
Spawn an IRC job.
Args:
label (str): The species label.
irc_direction (str): The IRC job direction, either 'forward' or 'reverse'.
"""
self.run_job(label=label,
xyz=self.species_dict[label].get_xyz(generate=False),
level_of_theory=self.irc_level,
job_type='irc',
irc_direction=irc_direction,
)
[docs] def run_orbitals_job(self, label):
"""
Spawn orbitals job used for molecular orbital visualization.
Currently supporting QChem for printing the orbitals, the output could be visualized using IQMol.
Args:
label (str): The species label.
"""
self.run_job(label=label,
xyz=self.species_dict[label].get_xyz(generate=False),
level_of_theory=self.orbitals_level,
job_type='orbitals',
)
[docs] def run_onedmin_job(self, label):
"""
Spawn a lennard-jones calculation using OneDMin.
Args:
label (str): The species label.
"""
if 'onedmin' not in self.ess_settings:
logger.error('Cannot execute a Lennard Jones job without the OneDMin software')
elif 'onedmin' not in self.job_dict[label].keys():
self.run_job(label=label,
xyz=self.species_dict[label].get_xyz(generate=False),
job_type='onedmin',
)
[docs] def spawn_post_opt_jobs(self,
label: str,
job_name: str,
):
"""
Spawn additional jobs after opt has converged.
Args:
label (str): The species label.
job_name (str): The opt job name (used for differentiating between ``opt`` and ``optfreq`` jobs).
"""
composite = 'composite' in job_name # Whether this was a composite job
# Check whether this was originally a composite method that was troubleshooted as 'opt'.
if not composite and self.composite_method:
self.run_composite_job(label)
return None
# Check whether this is a composite job but wasn't originally so (probably troubleshooted as such).
if composite and not self.composite_method:
self.run_opt_job(label, fine=self.fine_only)
return None
# Spawn IRC if requested and if relevant.
if label in self.output.keys() and self.job_types['irc'] and self.species_dict[label].is_ts:
self.run_irc_job(label=label, irc_direction='forward')
self.run_irc_job(label=label, irc_direction='reverse')
# Spawn freq (or check it if this is a composite job) for polyatomic molecules.
if label in self.output.keys() and self.species_dict[label].number_of_atoms > 1 \
and self.species_dict[label].irc_label is None:
if 'freq' not in job_name and self.job_types['freq']:
# This is either an opt or a composite job (not an optfreq job), spawn freq.
self.run_freq_job(label)
if 'optfreq' in job_name:
# This is an 'optfreq' job type, don't spawn freq (but do check it).
self.check_freq_job(label=label, job=self.job_dict[label]['optfreq'][job_name])
# Spawn sp after an opt (non-composite) job.
if not composite and self.job_types['sp'] and self.species_dict[label].irc_label is None:
self.run_sp_job(label)
# Perceive the Molecule from xyz.
# Useful for TS species where xyz might not be given at the outset to perceive a .mol attribute.
if label in self.output.keys() and self.species_dict[label].mol is None:
self.species_dict[label].mol_from_xyz()
# Spawn scan jobs.
if self.job_types['rotors'] and self.species_dict[label].irc_label is None:
if not self.species_dict[label].rotors_dict:
self.species_dict[label].determine_rotors()
self.run_scan_jobs(label)
# Spawn post sp actions if this is a composite job.
if composite and self.composite_method:
self.post_sp_actions(label=label,
sp_path=self.job_dict[label]['composite'][job_name].local_path_to_output_file)
# Spawn orbitals job.
if self.job_types['orbitals'] and 'orbitals' not in self.job_dict[label].keys():
self.run_orbitals_job(label)
# Spawn onedmin job.
if label in self.output.keys() and self.job_types['onedmin'] and not self.species_dict[label].is_ts:
self.run_onedmin_job(label)
# Spawn bde jobs.
if label in self.output.keys() and self.job_types['bde'] and self.species_dict[label].bdes is not None:
bde_species_list = self.species_dict[label].scissors()
for bde_species in bde_species_list:
if bde_species.label != 'H':
# H was added in main.
logger.info(f'Creating the BDE species {bde_species.label} from the original species {label}')
self.species_list.append(bde_species)
self.species_dict[bde_species.label] = bde_species
self.unique_species_labels.append(bde_species.label)
self.initialize_output_dict(label=bde_species.label)
self.job_dict[bde_species.label] = dict()
self.running_jobs[bde_species.label] = list()
if bde_species.number_of_atoms == 1:
logger.debug(f'Species {bde_species.label} is monoatomic')
# No need to run opt/freq jobs for a monoatomic species, only run sp (or composite if relevant)
if self.composite_method:
self.run_composite_job(bde_species.label)
else:
self.run_sp_job(label=bde_species.label)
# determine the lowest energy conformation of radicals generated in BDE calculations
self.run_conformer_jobs(labels=[species.label for species in bde_species_list
if species.number_of_atoms > 1])
# Check whether any reaction was waiting for this species to spawn TS search jobs.
if label in self.output.keys() and not self.species_dict[label].is_ts:
self.spawn_ts_jobs()
[docs] def spawn_ts_jobs(self):
"""
Check if any new reaction has all of its reactants and products optimized,
and if so spawn the respective TSG jobs.
Don't spawn TS jobs if the multiplicity of the reaction could not be determined.
"""
for rxn in self.rxn_list:
rxn.check_done_opt_r_n_p()
if rxn.done_opt_r_n_p and not rxn.ts_species.tsg_spawned:
if rxn.multiplicity is None:
logger.info(f'Not spawning TS search jobs for reaction {rxn} for which the multiplicity is unknown.')
else:
rxn.ts_species.tsg_spawned = True
tsg_index = 0
for method in self.ts_adapters:
if method in all_families_ts_adapters or \
(rxn.family is not None
and rxn.family.label in list(ts_adapters_by_rmg_family.keys())
and method in ts_adapters_by_rmg_family[rxn.family.label]):
self.run_job(job_type='tsg',
job_adapter=method,
reactions=[rxn],
tsg=tsg_index,
)
tsg_index += 1
if all('user guess' in tsg.method for tsg in rxn.ts_species.ts_guesses):
rxn.ts_species.tsg_spawned = True
self.run_conformer_jobs(labels=[rxn.ts_label])
[docs] def spawn_directed_scan_jobs(self,
label: str,
rotor_index: int,
xyz: Optional[str] = None,
):
"""
Spawn directed scan jobs.
Directed scan types could be one of the following: 'brute_force_sp', 'brute_force_opt', 'cont_opt',
'brute_force_sp_diagonal', 'brute_force_opt_diagonal', or 'cont_opt_diagonal'.
Here we treat ``cont`` and ``brute_force`` separately, and also consider the ``diagonal`` keyword.
The differentiation between ``sp`` and ``opt`` is done in the Job module.
Args:
label (str): The species label.
rotor_index (int): The 0-indexed rotor number (key) in the species.rotors_dict dictionary.
xyz (str, optional): The 3D coordinates for a continuous directed scan.
Raises:
InputError: If the species directed scan type has an unexpected value,
or if ``xyz`` wasn't given for a cont_opt job.
SchedulerError: If the rotor scan resolution as defined in settings.py is illegal.
"""
increment = rotor_scan_resolution
if divmod(360, increment)[1]:
raise SchedulerError(f'The directed scan got an illegal scan resolution of {increment}')
torsions = self.species_dict[label].rotors_dict[rotor_index]['torsion']
directed_scan_type = self.species_dict[label].rotors_dict[rotor_index]['directed_scan_type']
xyz = xyz or self.species_dict[label].get_xyz(generate=True)
if 'cont' not in directed_scan_type and 'brute' not in directed_scan_type and 'ess' not in directed_scan_type:
raise InputError(f'directed_scan_type must be either continuous or brute force, got: {directed_scan_type}')
if 'ess' in directed_scan_type:
# Allow the ESS to control the scan.
self.run_job(label=label,
xyz=xyz,
level_of_theory=self.scan_level,
job_type='scan',
directed_scan_type=directed_scan_type,
torsions=torsions,
rotor_index=rotor_index,
)
elif 'brute' in directed_scan_type:
# spawn jobs all at once
dihedrals = dict()
for torsion in torsions:
original_dihedral = get_angle_in_180_range(calculate_dihedral_angle(coords=xyz['coords'],
torsion=torsion,
index=0))
dihedrals[tuple(torsion)] = [get_angle_in_180_range(original_dihedral + i * increment) for i in
range(int(360 / increment) + 1)]
modified_xyz = xyz
if 'diagonal' not in directed_scan_type:
# increment dihedrals one by one (resulting in an ND scan)
all_dihedral_combinations = list(itertools.product(*[dihedrals[tuple(torsion)] for torsion in torsions]))
for dihedral_tuple in all_dihedral_combinations:
for torsion, dihedral in zip(torsions, dihedral_tuple):
self.species_dict[label].set_dihedral(scan=torsion,
index=0,
deg_abs=dihedral,
count=False,
xyz=modified_xyz)
modified_xyz = self.species_dict[label].initial_xyz
self.species_dict[label].rotors_dict[rotor_index]['number_of_running_jobs'] += 1
self.run_job(label=label,
xyz=modified_xyz,
level_of_theory=self.scan_level,
job_type='directed_scan',
directed_scan_type=directed_scan_type,
torsions=torsions,
dihedrals=list(dihedral_tuple),
rotor_index=rotor_index,
)
else:
# increment all dihedrals at once (resulting in a unique 1D scan along several changing dimensions)
for i in range(len(dihedrals[tuple(torsions[0])])):
for torsion in torsions:
dihedral = dihedrals[tuple(torsion)][i]
self.species_dict[label].set_dihedral(scan=torsion,
index=0,
deg_abs=dihedral,
count=False,
xyz=modified_xyz)
modified_xyz = self.species_dict[label].initial_xyz
dihedrals = [dihedrals[tuple(torsion)][i] for torsion in torsions]
self.species_dict[label].rotors_dict[rotor_index]['number_of_running_jobs'] += 1
self.run_job(label=label,
xyz=modified_xyz,
level_of_theory=self.scan_level,
job_type='directed_scan',
directed_scan_type=directed_scan_type,
torsions=torsions,
dihedrals=dihedrals,
rotor_index=rotor_index,
)
elif 'cont' in directed_scan_type:
# spawn jobs one by one
if not len(self.species_dict[label].rotors_dict[rotor_index]['cont_indices']):
self.species_dict[label].rotors_dict[rotor_index]['cont_indices'] = [0] * len(torsions)
if not len(self.species_dict[label].rotors_dict[rotor_index]['original_dihedrals']):
self.species_dict[label].rotors_dict[rotor_index]['original_dihedrals'] = \
[f'{calculate_dihedral_angle(coords=xyz["coords"], torsion=scan, index=1):.2f}'
for scan in self.species_dict[label].rotors_dict[rotor_index]['scan']] # stores as str for YAML
rotor_dict = self.species_dict[label].rotors_dict[rotor_index]
torsions = rotor_dict['torsion']
max_num = 360 / increment + 1 # dihedral angles per scan
original_dihedrals = list()
for dihedral in rotor_dict['original_dihedrals']:
original_dihedrals.append(get_angle_in_180_range(dihedral))
if not any(self.species_dict[label].rotors_dict[rotor_index]['cont_indices']):
# This is the first call for this cont_opt directed rotor, spawn the first job w/o changing dihedrals.
self.run_job(label=label,
xyz=self.species_dict[label].final_xyz,
level_of_theory=self.scan_level,
job_type='directed_scan',
directed_scan_type=directed_scan_type,
torsions=torsions,
dihedrals=original_dihedrals,
rotor_index=rotor_index,
)
self.species_dict[label].rotors_dict[rotor_index]['cont_indices'][0] += 1
return
else:
# this is NOT the first call for this cont_opt directed rotor, check that ``xyz`` was given.
if xyz is None:
# xyz is None only at the first time cont opt is spawned, where cont_index is [0, 0,... 0].
raise InputError('xyz argument must be given for a continuous scan job')
# check whether this rotor is done
if self.species_dict[label].rotors_dict[rotor_index]['cont_indices'][-1] == max_num - 1: # 0-indexed
# no more counters to increment, all done!
logger.info(f'Completed all jobs for the continuous directed rotor scan for species {label} '
f'between pivots {rotor_dict["pivots"]}')
self.process_directed_scans(label, rotor_dict['pivots'])
return
modified_xyz = xyz
dihedrals = list()
for index, (original_dihedral, torsion) in enumerate(zip(original_dihedrals, torsions)):
dihedral = original_dihedral + \
self.species_dict[label].rotors_dict[rotor_index]['cont_indices'][index] * increment
# Change the original dihedral so we won't end up with two calcs for 180.0, but none for -180.0
# (it only matters for plotting, the geometry is of course the same)
dihedral = get_angle_in_180_range(dihedral)
dihedrals.append(dihedral)
# Only change the dihedrals in the xyz if this torsion corresponds to the current index,
# or if this is a diagonal scan.
# Species.set_dihedral() uses .final_xyz or the given xyz to modify the .initial_xyz
# attribute to the desired dihedral.
self.species_dict[label].set_dihedral(scan=torsion,
index=0,
deg_abs=dihedral,
count=False,
xyz=modified_xyz)
modified_xyz = self.species_dict[label].initial_xyz
self.run_job(label=label,
xyz=modified_xyz,
level_of_theory=self.scan_level,
job_type='directed_scan',
directed_scan_type=directed_scan_type,
torsions=torsions,
dihedrals=dihedrals,
rotor_index=rotor_index,
)
if 'diagonal' in directed_scan_type:
# increment ALL counters for a diagonal scan
self.species_dict[label].rotors_dict[rotor_index]['cont_indices'] = \
[self.species_dict[label].rotors_dict[rotor_index]['cont_indices'][0] + 1] * len(torsions)
else:
# increment the counter sequentially (non-diagonal scan)
for index in range(len(torsions)):
if self.species_dict[label].rotors_dict[rotor_index]['cont_indices'][index] < max_num - 1:
self.species_dict[label].rotors_dict[rotor_index]['cont_indices'][index] += 1
break
elif (self.species_dict[label].rotors_dict[rotor_index]['cont_indices'][index] == max_num - 1
and index < len(torsions) - 1):
self.species_dict[label].rotors_dict[rotor_index]['cont_indices'][index] = 0
[docs] def process_directed_scans(self, label: str, pivots: Union[List[int], List[List[int]]]):
"""
Process all directed rotors for a species and check the quality of the scan.
Args:
label (str): The species label.
pivots (Union[List[int], List[List[int]]]): The rotor pivots.
"""
for rotor_dict_index in self.species_dict[label].rotors_dict.keys():
rotor_dict = self.species_dict[label].rotors_dict[rotor_dict_index] # avoid modifying the iterator
if rotor_dict['pivots'] == pivots:
# identified a directed scan (either continuous or brute force, they're treated the same here)
dihedrals = [[float(dihedral) for dihedral in dihedral_string_tuple]
for dihedral_string_tuple in rotor_dict['directed_scan'].keys()]
sorted_dihedrals = sorted(dihedrals)
min_energy = extremum_list([directed_scan_dihedral['energy']
for directed_scan_dihedral in rotor_dict['directed_scan'].values()],
return_min=True)
trshed_points = 0
if rotor_dict['directed_scan_type'] == 'ess':
# parse the single output file
results = parser.parse_nd_scan_energies(path=rotor_dict['scan_path'])[0]
else:
results = {'directed_scan_type': rotor_dict['directed_scan_type'],
'scans': rotor_dict['scan'],
'directed_scan': rotor_dict['directed_scan']}
for dihedral_list in sorted_dihedrals:
dihedrals_key = tuple(f'{dihedral:.2f}' for dihedral in dihedral_list)
dihedral_dict = results['directed_scan'][dihedrals_key]
if dihedral_dict['trsh']:
trshed_points += 1
if dihedral_dict['energy'] is not None:
dihedral_dict['energy'] -= min_energy # set 0 at the minimal energy
folder_name = 'rxns' if self.species_dict[label].is_ts else 'Species'
rotor_yaml_file_path = os.path.join(self.project_directory, 'output', folder_name, label, 'rotors',
f'{pivots}_{rotor_dict["directed_scan_type"]}.yml')
plotter.save_nd_rotor_yaml(results, path=rotor_yaml_file_path)
self.species_dict[label].rotors_dict[rotor_dict_index]['scan_path'] = rotor_yaml_file_path
if trshed_points:
logger.warning(f'Directed rotor scan for species {label} between pivots {rotor_dict["pivots"]} '
f'had {trshed_points} points that required optimization troubleshooting.')
rotor_path = os.path.join(self.project_directory, 'output', folder_name, label, 'rotors')
if len(results['scans']) == 1:
plotter.plot_1d_rotor_scan(
results=results,
path=rotor_path,
scan=rotor_dict['scan'][0],
label=label,
original_dihedral=self.species_dict[label].rotors_dict[rotor_dict_index]['original_dihedrals'],
)
elif len(results['scans']) == 2:
plotter.plot_2d_rotor_scan(results=results, path=rotor_path)
else:
logger.debug('Not plotting ND rotors with N > 2')
[docs] def parse_composite_geo(self,
label: str,
job: 'JobAdapter',
) -> bool:
"""
Check that a 'composite' job converged successfully, and parse the geometry into `final_xyz`.
Also checks (QA) that no imaginary frequencies were assigned for stable species,
and that exactly one imaginary frequency was assigned for a TS.
Returns ``True`` if the job converged successfully, ``False`` otherwise and troubleshoots.
Args:
label (str): The species label.
job (JobAdapter): The composite job object.
Returns:
bool: Whether the job converged successfully.
"""
logger.debug(f'parsing composite geo for {job.job_name}')
freq_ok = False
if job.job_status[1]['status'] == 'done':
self.species_dict[label].final_xyz = parser.parse_xyz_from_file(path=job.local_path_to_output_file)
self.output[label]['job_types']['composite'] = True
self.output[label]['job_types']['opt'] = True
self.output[label]['job_types']['sp'] = True
if self.job_types['fine']:
self.output[label]['job_types']['fine'] = True # all composite jobs are fine if fine was asked for
self.output[label]['paths']['composite'] = os.path.join(job.local_path_to_output_file)
if self.composite_method is not None:
self.species_dict[label].opt_level = self.composite_method.simple()
rxn_str = ''
if self.species_dict[label].is_ts:
rxn_str = f' of reaction {self.species_dict[label].rxn_label}' \
if self.species_dict[label].rxn_label is not None else ''
logger.info(f'\nOptimized geometry for {label}{rxn_str} at {job.level.simple()}:\n'
f'{xyz_to_str(xyz_dict=self.species_dict[label].final_xyz)}\n')
plotter.save_geo(species=self.species_dict[label], project_directory=self.project_directory)
if not job.is_ts:
plotter.draw_structure(species=self.species_dict[label],
project_directory=self.project_directory)
else:
# for TSs, only use `draw_3d()`, not `show_sticks()` which gets connectivity wrong:
plotter.draw_structure(species=self.species_dict[label],
project_directory=self.project_directory,
method='draw_3d')
frequencies = parser.parse_frequencies(job.local_path_to_output_file, job.job_adapter)
freq_ok = self.check_negative_freq(label=label, job=job, vibfreqs=frequencies)
if freq_ok:
# Update restart dictionary and save a restart file:
self.save_restart_dict()
success = True # run freq / scan jobs on this optimized geometry
if not self.species_dict[label].is_ts:
is_isomorphic = self.species_dict[label].check_xyz_isomorphism(
allow_nonisomorphic_2d=self.allow_nonisomorphic_2d)
if is_isomorphic:
self.output[label]['isomorphism'] += 'composite passed isomorphism check; '
else:
self.output[label]['isomorphism'] += 'composite did not pass isomorphism check; '
success &= is_isomorphic
return success
elif not self.species_dict[label].is_ts and self.trsh_ess_jobs:
self.troubleshoot_negative_freq(label=label, job=job)
if job.job_status[1]['status'] != 'done' or (not freq_ok and not self.species_dict[label].is_ts):
self.troubleshoot_ess(label=label, job=job, level_of_theory=job.level)
return False # return ``False``, so no freq / scan jobs are initiated for this unoptimized geometry
[docs] def parse_opt_e_elect(self,
label: str,
job: 'JobAdapter',
) -> bool:
"""
Parse electronic energy for 'opt' or 'optfreq' job if it converged successfully.
Args:
label (str): The species label.
job (JobAdapter): The optimization job object.
"""
multi_species = any(spc.multi_species == label for spc in self.species_list)
if multi_species:
for spc in self.species_list:
if spc.multi_species == label:
self.species_dict[spc.label].e_elect = parser.parse_e_elect(path=self.multi_species_path_dict[spc.label])
self.save_e_elect(spc.label)
else:
e_elect_value = parser.parse_e_elect(path=job.local_path_to_xyz or job.local_path_to_output_file) \
if label in self.species_dict.keys() else dict()
self.species_dict[label].e_elect = e_elect_value
self.save_e_elect(label)
[docs] def parse_opt_geo(self,
label: str,
job: 'JobAdapter',
) -> bool:
"""
Check that an 'opt' or 'optfreq' job converged successfully, and parse the geometry into `final_xyz`.
If the job is 'optfreq', also checks (QA) that no imaginary frequencies were assigned for stable species,
and that exactly one imaginary frequency was assigned for a TS.
Returns ``True`` if the job (or both jobs) converged successfully, ``False`` otherwise and troubleshoots opt.
Args:
label (str): The species label.
job (JobAdapter): The optimization job object.
Returns:
bool: Whether the job converged successfully.
"""
success = False
multi_species_opt_xyzs = dict()
logger.debug(f'parsing opt geo for {job.job_name}')
if job.job_status[1]['status'] == 'done':
multi_species = any(spc.multi_species == label for spc in self.species_list)
species_labels = [spc.label for spc in self.species_list if spc.multi_species == label]\
if multi_species else list()
opt_xyz = parser.parse_xyz_from_file(path=job.local_path_to_xyz or job.local_path_to_output_file) \
if label in self.species_dict.keys() else dict()
if multi_species:
for spc in self.species_list:
if spc.multi_species == label:
multi_species_opt_xyzs[spc.label] = parser.parse_xyz_from_file(path=self.multi_species_path_dict[spc.label])
if not job.fine and self.job_types['fine'] \
and not job.level.method_type == 'wavefunction' \
and self.species_dict[label].irc_label is None:
# Run opt again using a finer grid.
# Save the optimized geometry as ``initial_xyz``, since trsh looks there.
if multi_species:
for spc in self.species_list:
if spc.multi_species == label:
spc.initial_xyz = multi_species_opt_xyzs[spc.label]
else:
self.species_dict[label].initial_xyz = opt_xyz
self.run_job(label=species_labels if multi_species else label,
xyz=opt_xyz if not multi_species else None,
level_of_theory=job.level,
job_type='opt',
fine=True,
)
else:
success = True
if multi_species:
for spc in self.species_list:
if spc.multi_species == label:
self.species_dict[spc.label].final_xyz = multi_species_opt_xyzs[spc.label]
self.post_opt_geo_work(spc.label, job)
else:
self.species_dict[label].final_xyz = opt_xyz
self.post_opt_geo_work(label, job)
if 'optfreq' in job.job_name:
self.check_freq_job(label, job)
self.save_restart_dict()
if not multi_species:
if not self.species_dict[label].is_ts:
plotter.draw_structure(species=self.species_dict[label], project_directory=self.project_directory)
if self.species_dict[label].irc_label is not None:
self.check_irc_species(label=label)
return False
is_isomorphic = self.species_dict[label].check_xyz_isomorphism(
allow_nonisomorphic_2d=self.allow_nonisomorphic_2d)
if is_isomorphic:
if 'opt passed isomorphism check' not in self.output[label]['isomorphism']:
self.output[label]['isomorphism'] += 'opt passed isomorphism check; '
else:
self.output[label]['isomorphism'] += 'opt did not pass isomorphism check; '
success &= is_isomorphic
else:
# for TSs, only use `draw_3d()`, not `show_sticks()` which gets connectivity wrong:
plotter.draw_structure(species=self.species_dict[label],
project_directory=self.project_directory,
method='draw_3d')
elif self.trsh_ess_jobs:
self.troubleshoot_opt_jobs(label=label)
return success
[docs] def post_opt_geo_work(self,
spc_label: str,
job: 'JobAdapter'):
"""
Few steps to finish after running the opt job.
Args:
spc_label (str): The species label.
job (JobAdapter): The optimization job object.
"""
self.output[spc_label]['job_types']['opt'] = True
if self.job_types['fine']:
self.output[spc_label]['job_types']['fine'] = True
self.species_dict[spc_label].opt_level = self.opt_level.simple()
plotter.save_geo(species=self.species_dict[spc_label], project_directory=self.project_directory)
if self.species_dict[spc_label].is_ts:
rxn_str = f' of reaction {self.species_dict[spc_label].rxn_label}' \
if self.species_dict[spc_label].rxn_label is not None else ''
else:
rxn_str = ''
logger.info(f'\nOptimized geometry for {spc_label}{rxn_str} at {job.level.simple()}:\n'
f'{xyz_to_str(self.species_dict[spc_label].final_xyz)}\n')
self.output[spc_label]['paths']['geo'] = job.local_path_to_output_file # will be overwritten with freq
[docs] def check_freq_job(self,
label: str,
job: 'JobAdapter',
):
"""
Check that a freq job converged successfully. Also checks (QA) that no imaginary frequencies were assigned for
stable species, and that exactly one imaginary frequency was assigned for a TS.
Args:
label (str): The species label.
job (JobAdapter): The frequency job object instance.
"""
freq_ok, switch_ts = False, False
wrong_freq_message = 'wrong number of negative frequencies; '
if job.job_status[1]['status'] == 'done':
if not os.path.isfile(job.local_path_to_output_file):
raise SchedulerError('Called check_freq_job with no output file')
vibfreqs = parser.parse_frequencies(path=str(job.local_path_to_output_file), software=job.job_adapter)
freq_ok = self.check_negative_freq(label=label, job=job, vibfreqs=vibfreqs)
if freq_ok:
# Copy the frequency file to the species / TS output folder.
folder_name = 'rxns' if self.species_dict[label].is_ts else 'Species'
freq_path = os.path.join(self.project_directory, 'output', folder_name, label, 'geometry', 'freq.out')
safe_copy_file(source=job.local_path_to_output_file, destination=freq_path)
# Set species.polarizability.
polarizability = parser.parse_polarizability(job.local_path_to_output_file)
if polarizability is not None:
self.species_dict[label].transport_data.polarizability = (polarizability, str('angstroms^3'))
if self.species_dict[label].transport_data.comment:
self.species_dict[label].transport_data.comment += \
str(f'\nPolarizability calculated at the {self.freq_level.simple()} level of theory')
else:
self.species_dict[label].transport_data.comment = \
str(f'Polarizability calculated at the {self.freq_level.simple()} level of theory')
if self.species_dict[label].is_ts:
if self.species_dict[label].rxn_index in self.rxn_dict.keys():
check_ts(reaction=self.rxn_dict[self.species_dict[label].rxn_index],
job=job,
checks=['freq'],
skip_nmd=self.skip_nmd,
)
if self.species_dict[label].ts_checks['NMD'] is False:
logger.info(f'TS {label} did not pass the normal mode displacement check. '
f'Status is:\n{self.species_dict[label].ts_checks}\n'
f'Searching for a better TS conformer...')
self.switch_ts(label)
switch_ts = True
if wrong_freq_message in self.output[label]['warnings']:
self.output[label]['warnings'] = ''.join(self.output[label]['warnings'].split(wrong_freq_message))
elif not self.species_dict[label].is_ts and self.trsh_ess_jobs:
# Only trsh neg freq here for non TS species, trsh TS species is done in check_negative_freq().
self.troubleshoot_negative_freq(label=label, job=job)
if not freq_ok:
self.output[label]['warnings'] += wrong_freq_message
if job.job_status[1]['status'] != 'done' or (not freq_ok and not self.species_dict[label].is_ts):
self.troubleshoot_ess(label=label, job=job, level_of_theory=job.level)
if (job.job_status[1]['status'] == 'done' and freq_ok and not switch_ts
and species_has_sp(self.output[label], self.species_dict[label].yml_path)):
self.check_rxn_e0_by_spc(label)
[docs] def check_negative_freq(self,
label: str,
job: 'JobAdapter',
vibfreqs: Union[list, np.ndarray],
):
"""
A helper function for determining the number of negative frequencies. Also logs appropriate errors.
Returns ``True`` if the number of negative frequencies is as excepted, ``False`` otherwise.
Args:
label (str): The species label.
job (JobAdapter): The optimization job object.
vibfreqs (list): The vibrational frequencies.
"""
neg_freqs = list()
for freq in vibfreqs:
if freq < 0:
neg_freqs.append(freq)
if not self.species_dict[label].is_ts:
if len(neg_freqs) != 0:
logger.error(f'Species {label} has {len(neg_freqs)} imaginary frequencies ({neg_freqs}), '
f'should have exactly 0.')
if f'{len(neg_freqs)} imaginary freq for' not in self.output[label]['warnings']:
self.output[label]['warnings'] += f'Warning: {len(neg_freqs)} imaginary freq for stable species ' \
f'({neg_freqs}); '
return False
else:
self.output[label]['job_types']['freq'] = True
self.output[label]['paths']['geo'] = job.local_path_to_output_file
self.output[label]['paths']['freq'] = job.local_path_to_output_file
if not self.testing:
# Update restart dictionary and save the yaml restart file:
self.save_restart_dict()
return True
else:
# This is a TS. Assign the imaginary frequencies to the respective TSGuess.
tsg = None
for tsg in self.species_dict[label].ts_guesses:
if tsg.conformer_index == self.species_dict[label].chosen_ts:
tsg.imaginary_freqs = neg_freqs
break
if tsg is not None and not check_imaginary_frequencies(tsg.imaginary_freqs):
# Imaginary frequencies are problematic, try choosing a different TSGuess, and optimize it.
add_text = f' major imaginary frequency between {LOWEST_MAJOR_TS_FREQ} and {HIGHEST_MAJOR_TS_FREQ}.' \
if len(neg_freqs) == 1 and (neg_freqs[0] < LOWEST_MAJOR_TS_FREQ
or neg_freqs[0] > HIGHEST_MAJOR_TS_FREQ) else ''
logger.error(f'TS {label} has {len(neg_freqs)} imaginary frequencies ({neg_freqs}), '
f'should have exactly 1{add_text}.')
if f'{len(neg_freqs)} imaginary freqs for' not in self.output[label]['warnings']:
self.output[label]['warnings'] += f'Warning: {len(neg_freqs)} imaginary freqs for TS ({neg_freqs}); '
logger.info(f'TS {label} did not pass the negative frequency check. '
f'Status is:\n{self.species_dict[label].ts_checks}\n'
f'Searching for a better TS conformer...')
self.switch_ts(label=label)
return False
else:
logger.info(f'TS {label} has exactly one imaginary frequency: {neg_freqs[0]}')
self.output[label]['info'] += f'Imaginary frequency: {neg_freqs[0] if len(neg_freqs) == 1 else neg_freqs}; '
self.output[label]['job_types']['freq'] = True
self.output[label]['paths']['geo'] = job.local_path_to_output_file
self.output[label]['paths']['freq'] = job.local_path_to_output_file
plotter.save_conformers_file(
project_directory=self.project_directory,
label=label,
xyzs=[tsg.opt_xyz for tsg in self.species_dict[label].ts_guesses],
level_of_theory=self.ts_guess_level,
multiplicity=self.species_dict[label].multiplicity,
charge=self.species_dict[label].charge,
is_ts=True,
energies=[tsg.energy for tsg in self.species_dict[label].ts_guesses],
ts_methods=[f'{tsg.method} '
f'{tsg.method_direction if tsg.method_direction is not None else ""} '
f'{tsg.method_index if tsg.method_index is not None else ""} '
for tsg in self.species_dict[label].ts_guesses],
im_freqs=[tsg.imaginary_freqs for tsg in self.species_dict[label].ts_guesses]
if any(tsg.imaginary_freqs is not None for tsg in self.species_dict[label].ts_guesses) else None,
before_optimization=False,
)
if not self.testing:
# Update restart dictionary and save the yaml restart file:
self.save_restart_dict()
# Set the ts_checks attribute of the TS species:
self.species_dict[label].ts_checks['freq'] = True
return True
[docs] def check_rxn_e0_by_spc(self, label: str):
"""
Check the E0 (electronic energy + ZPE) of reactions related to a specific species.
Requires all opt + freq computations to be converged for all species (and TS) participating in each reaction.
Args:
label (str): A label representing a species.
"""
for rxn in self.rxn_list:
labels = rxn.reactants + rxn.products + [rxn.ts_label]
if label in labels and rxn.ts_species.ts_checks['E0'] is None \
and all([species_has_sp_and_freq(output_dict, self.species_dict[spc_label].yml_path)
for spc_label, output_dict in self.output.items() if spc_label in labels]):
check_ts(reaction=rxn,
checks=['energy'],
species_dict=self.species_dict,
project_directory=self.project_directory,
kinetics_adapter=self.kinetics_adapter,
output=self.output,
sp_level=self.sp_level if not self.composite_method else self.composite_method,
freq_scale_factor=self.freq_scale_factor,
verbose=True,
)
if rxn.ts_species.ts_checks['E0'] is False:
logger.info(f'TS {rxn.ts_species.label} of reaction {rxn.label} did not pass the E0 check.\n'
f'Searching for a better TS conformer...\n')
self.switch_ts(rxn.ts_label)
[docs] def switch_ts(self, label: str):
"""
Try the next optimized TS guess in line if a previous TS guess was found to be wrong.
Args:
label (str): The TS species label.
"""
logger.info(f'Switching a TS guess for {label}...')
self.determine_most_likely_ts_conformer(label=label) # Look for a different TS guess.
self.delete_all_species_jobs(label=label) # Delete other currently running jobs for this TS.
self.output[label]['geo'] = self.output[label]['freq'] = self.output[label]['sp'] = self.output[label]['composite'] = ''
freq_path = os.path.join(self.project_directory, 'output', 'rxns', label, 'geometry', 'freq.out')
if os.path.isfile(freq_path):
os.remove(freq_path)
self.species_dict[label].populate_ts_checks() # Restart the TS checks dict.
if not self.species_dict[label].ts_guesses_exhausted and self.species_dict[label].chosen_ts is not None:
logger.info(f'Optimizing species {label} again using a different TS guess: '
f'conformer {self.species_dict[label].chosen_ts}')
if not self.composite_method:
self.run_opt_job(label, fine=self.fine_only)
else:
self.run_composite_job(label)
[docs] def check_sp_job(self,
label: str,
job: 'JobAdapter',
):
"""
Check that a single point job converged successfully.
Args:
label (str): The species label.
job (JobAdapter): The single point job object.
"""
if 'mrci' in self.sp_level.method and job.level is not None and 'mrci' not in job.level.method:
# This is a CCSD job ran before MRCI. Spawn MRCI
self.run_sp_job(label)
elif job.job_status[1]['status'] == 'done':
self.post_sp_actions(label,
sp_path=os.path.join(job.local_path_to_output_file),
level=job.level,
)
# Update restart dictionary and save the yaml restart file:
self.save_restart_dict()
if self.species_dict[label].number_of_atoms == 1:
# save the geometry from the sp job for monoatomic species for which no opt/freq jobs will be spawned
self.output[label]['paths']['geo'] = job.local_path_to_output_file
else:
self.troubleshoot_ess(label=label,
job=job,
level_of_theory=job.level,
)
[docs] def post_sp_actions(self,
label: str,
sp_path: str,
level: Optional[Level] = None,
):
"""
Perform post-sp actions.
Args:
label (str): The species label.
sp_path (str): The path to 'output.out' for the single point job.
level (Level, optional): The level of theory used for the sp job.
"""
original_sp_path = self.output[label]['paths']['sp'] if 'sp' in self.output[label]['paths'] else None
self.output[label]['paths']['sp'] = sp_path
if self.sp_level is not None and 'ccsd' in self.sp_level.method:
self.species_dict[label].t1 = parser.parse_t1(self.output[label]['paths']['sp'])
zpe_scale_factor = 0.99 if (self.composite_method is not None and 'cbs-qb3' in self.composite_method.method) \
else 1.0
self.species_dict[label].e_elect = parser.parse_e_elect(self.output[label]['paths']['sp'],
zpe_scale_factor=zpe_scale_factor)
if self.species_dict[label].t1 is not None:
txt = ''
if self.species_dict[label].t1 > 0.02:
txt += ". Looks like it should be treated using a multireference single-point energy method."
elif self.species_dict[label].t1 > 0.015:
txt += ". It might have multireference characteristic."
logger.info(f'Species {label} has a T1 diagnostic parameter of {self.species_dict[label].t1}{txt}')
self.output[label]['info'] += f'T1 = {self.species_dict[label].t1}; '
if self.sp_level is not None and self.sp_level.solvation_scheme_level is not None:
# a complex solvation correction behavior was requested for the single-point energy value
if not self.output[label]['job_types']['sp']:
# this is the first "original" sp job, spawn two more at the sp_level.solvation_scheme_level level,
# with and without solvation corrections
solvation_sp_level = self.sp_level.solvation_scheme_level.copy()
solvation_sp_level.solvation_method = self.sp_level.solvation_method
solvation_sp_level.solvent = self.sp_level.solvent
self.run_sp_job(label=label, level=solvation_sp_level)
self.run_sp_job(label=label, level=self.sp_level.solvation_scheme_level)
else:
# this is one of the additional sp jobs spawned by the above previously
if level is not None and level.solvation_method is not None:
self.output[label]['paths']['sp_sol'] = sp_path
else:
self.output[label]['paths']['sp_no_sol'] = sp_path
self.output[label]['paths']['sp'] = original_sp_path # restore the original path
if species_has_freq(self.output[label], self.species_dict[label].yml_path):
self.check_rxn_e0_by_spc(label)
if self.report_e_elect:
self.save_e_elect(label)
# set *at the end* to differentiate between sp jobs when using complex solvation corrections
self.output[label]['job_types']['sp'] = True
[docs] def spawn_post_irc_jobs(self,
label: str,
job: 'JobAdapter',
):
"""
Spawn additional jobs after IRC has converged.
Args:
label (str): The species label.
job (JobAdapter): The IRC job object.
"""
self.output[label]['paths']['irc'].append(job.local_path_to_output_file)
index = 1
if len(self.output[label]['paths']['irc']) == 2:
index = 2
self.output[label]['job_types']['irc'] = True
plotter.save_irc_traj_animation(irc_f_path=self.output[label]['paths']['irc'][0],
irc_r_path=self.output[label]['paths']['irc'][1],
out_path=os.path.join(self.project_directory, 'output',
'rxns', label, 'irc_traj.gjf'))
irc_label = self.add_label_to_unique_species_labels(label=f'IRC_{label}_{index}')
irc_spc = ARCSpecies(label=irc_label,
xyz=parser.parse_xyz_from_file(job.local_path_to_output_file),
irc_label=label,
compute_thermo=False,
multiplicity=job.species[0].multiplicity,
charge=job.species[0].charge,
)
if self.species_dict[label].irc_label is None:
self.species_dict[label].irc_label = irc_spc.label
else:
self.species_dict[label].irc_label += f' {irc_spc.label}'
self.species_list.append(irc_spc)
self.species_dict[irc_spc.label] = irc_spc
self.initialize_output_dict(label=irc_spc.label)
self.run_job(label=irc_spc.label,
xyz=self.species_dict[irc_spc.label].get_xyz(),
level_of_theory=self.opt_level if not self.composite_method else self.freq_level,
job_type='opt',
fine=False,
)
[docs] def add_label_to_unique_species_labels(self, label: str) -> str:
"""
Adds a label to self.unique_species_labels.
Modifies the label if it is not unique.
Args:
label (str): A species label.
Returns:
str: The modified species label
"""
unique_label, i = label, 0
while unique_label in self.unique_species_labels:
unique_label = f'{label}_{i}'
i += 1
self.unique_species_labels.append(unique_label)
return unique_label
[docs] def check_irc_species(self, label: str):
"""
Check that the optimized geometry of the two species created from a TS IRC runs makes sense
Args:
label (str): The label of one of the optimized IRC-resulting species.
"""
ts_label = self.species_dict[label].irc_label
if len(self.output[ts_label]['paths']['irc']) == 2:
irc_species_labels = self.species_dict[ts_label].irc_label.split()
if all(self.output[irc_label]['paths']['geo'] for irc_label in irc_species_labels):
check_irc_species_and_rxn(xyz_1=self.output[irc_species_labels[0]]['paths']['geo'],
xyz_2=self.output[irc_species_labels[1]]['paths']['geo'],
rxn=self.rxn_dict.get(self.species_dict[ts_label].rxn_index, None),
)
[docs] def check_scan_job(self,
label: str,
job: 'JobAdapter'):
"""
Check that a rotor scan job converged successfully. Also checks (QA) whether the scan is relatively "smooth",
and whether the optimized geometry indeed represents the minimum energy conformer.
Recommends whether to use this rotor using the 'successful_rotors' and 'unsuccessful_rotors' attributes.
Args:
label (str): The species label.
job (JobAdapter): The rotor scan job object.
"""
# An 'Internal coordinate error' cannot be handled by troubleshooting, so we don't even try.
# It is usually related to bond or angle changes which mess up the internal coordinates during the scan.
invalidate, actions, energies, angles = False, list(), list(), list()
invalidation_reason, message = '', ''
if job.job_status[1]['status'] != 'done':
if job.job_status[1]['error'] == 'Internal coordinate error':
invalidate = True
invalidation_reason = 'Internal coordinate error; '
else:
self.troubleshoot_ess(label=label,
job=job,
level_of_theory=job.level)
return None
if job.rotor_index not in self.species_dict[label].rotors_dict.keys():
raise SchedulerError(f'Could not match rotor {job.rotor_index} of species {label} '
f'with pivots {self.species_dict[label].rotors_dict[job.rotor_index]["pivots"]} '
f'to any of the existing rotors in the species.\n'
f'The rotors dict of {label} is:\n{pprint.pformat(self.species_dict[label].rotors_dict)}')
if self.species_dict[label].rotors_dict[job.rotor_index]['dimensions'] == 1:
# This is a 1D scan.
# Read energy profile (in kJ/mol), it may be used in the troubleshooting.
energies, angles = parser.parse_1d_scan_energies(
path=job.local_path_to_output_file,
initial_angle=calculate_dihedral_angle(
coords=self.species_dict[label].get_xyz(),
torsion=self.species_dict[label].rotors_dict[job.rotor_index]['torsion']))
self.species_dict[label].rotors_dict[job.rotor_index]['original_dihedrals'] = \
[calculate_dihedral_angle(coords=job.xyz, torsion=job.torsions[0], index=0, units='degs')]
if energies is None:
invalidate = True
invalidation_reason = 'Could not read energies'
message = f'Energies from rotor scan of {label} of pivots ' \
f'{self.species_dict[label].rotors_dict[job.rotor_index]["pivots"]} could not ' \
f'be read. Invalidating rotor.'
logger.error(message)
elif len(energies) > 5:
trajectory = parser.parse_1d_scan_coords(path=job.local_path_to_output_file) \
if self.species_dict[label].is_ts else None
invalidate, invalidation_reason, message, actions = scan_quality_check(
label=label,
pivots=self.species_dict[label].rotors_dict[job.rotor_index]['pivots'],
energies=energies,
used_methods=self.species_dict[label].rotors_dict[job.rotor_index]['trsh_methods'],
log_file=job.local_path_to_output_file,
species=self.species_dict[label],
preserve_params=self.species_dict[label].preserve_param_in_scan,
trajectory=trajectory,
original_xyz=self.species_dict[label].final_xyz,
)
if len(list(actions.keys())) \
and 'pivTS' not in self.species_dict[label].rotors_dict[job.rotor_index]['invalidation_reason'] \
and self.trsh_ess_jobs:
# The rotor scan is problematic (and does not block a TS reaction zone), troubleshooting is required.
logger.info(f'Trying to troubleshoot rotor '
f'{self.species_dict[label].rotors_dict[job.rotor_index]["pivots"]} '
f'of species {label} ...')
# Try to troubleshoot the rotor. sometimes, troubleshooting cannot yield solutions
# actions from scan_quality_check() is not the actual actions applied,
# they will be post-processed by trsh_scan_job. If troubleshooting fails,
# The actual actions will be an empty list, indicating invalid rotor.
trsh_success, actions = self.troubleshoot_scan_job(job=job, methods=actions)
if not trsh_success:
# Detailed reasons are logged in the troubleshoot_scan_job().
invalidation_reason += ' But unable to propose troubleshooting methods.'
else:
# Record actions, only if the method is valid.
self.species_dict[label].rotors_dict[job.rotor_index]['trsh_methods'].append(actions)
if invalidate:
self.species_dict[label].rotors_dict[job.rotor_index]['success'] = False
self.species_dict[label].rotors_dict[job.rotor_index]['invalidation_reason'] = invalidation_reason
else:
self.species_dict[label].rotors_dict[job.rotor_index]['success'] = True
self.species_dict[label].rotors_dict[job.rotor_index]['symmetry'] = determine_rotor_symmetry(
label=label,
pivots=self.species_dict[label].rotors_dict[job.rotor_index]['pivots'],
rotor_path=job.local_path_to_output_file)[0]
logger.info(
f'Rotor scan {self.species_dict[label].rotors_dict[job.rotor_index]["scan"]} between pivots '
f'{self.species_dict[label].rotors_dict[job.rotor_index]["pivots"]} for {label} '
f'has symmetry {self.species_dict[label].rotors_dict[job.rotor_index]["symmetry"]}')
else:
# This is an ND scan, pass for now as it is currently not used for computing Q.
pass
if invalidate:
self.species_dict[label].rotors_dict[job.rotor_index]['success'] = None if len(actions) else False
# Save the path and invalidation reason for debugging and tracking the file.
# If ``success`` is None, it means that the job is being troubleshooted.
self.species_dict[label].rotors_dict[job.rotor_index]['scan_path'] = job.local_path_to_output_file
self.species_dict[label].rotors_dict[job.rotor_index]['invalidation_reason'] += invalidation_reason
# If energies were obtained, draw the scan curve.
if energies is not None and len(energies) and angles is not None and len(angles):
folder_name = 'rxns' if job.is_ts else 'Species'
rotor_path = os.path.join(self.project_directory, 'output', folder_name, job.species_label, 'rotors')
plotter.plot_1d_rotor_scan(angles=angles,
energies=energies,
path=rotor_path,
scan=torsions_to_scans(job.torsions[0]),
comment=message,
label=label,
original_dihedral=self.species_dict[label].rotors_dict[job.rotor_index][
'original_dihedrals'],
)
self.save_restart_dict()
[docs] def check_directed_scan(self, label, pivots, scan, energies):
"""
Checks (QA) whether the directed scan is relatively "smooth",
and whether the optimized geometry indeed represents the minimum energy conformer.
Recommends whether or not to use this rotor using the 'successful_rotors' and 'unsuccessful_rotors' attributes.
This method differs from check_directed_scan_job(), since here we consider the entire scan.
Args:
label (str): The species label.
pivots (List[List[int]]): The rotor pivots.
scan (List[int]): The four atoms defining the dihedral.
energies (List[float]): The rotor scan energies in kJ/mol.
Todo:
- Not used!!
- adjust to ND, merge with check_directed_scan_job (this one isn't being called)
"""
# If the job has not converged, troubleshoot
invalidate, invalidation_reason, message, actions = scan_quality_check(label=label,
pivots=pivots,
energies=energies)
if actions:
# the rotor scan is problematic, troubleshooting is required
if 'change conformer' in actions:
# a lower conformation was found
deg_increment = actions[1]
self.species_dict[label].set_dihedral(scan=scan, index=1, deg_increment=deg_increment)
is_isomorphic = self.species_dict[label].check_xyz_isomorphism(
allow_nonisomorphic_2d=self.allow_nonisomorphic_2d,
xyz=self.species_dict[label].initial_xyz)
if is_isomorphic:
self.delete_all_species_jobs(label)
# Remove all completed rotor calculation information
for rotor_dict in self.species_dict[label].rotors_dict.values():
# don't initialize all parameters, e.g., `times_dihedral_set` needs to remain as is
rotor_dict['scan_path'] = ''
rotor_dict['invalidation_reason'] = ''
rotor_dict['success'] = None
rotor_dict.pop('symmetry', None)
# re-run opt (or composite) on the new initial_xyz with the desired dihedral
if not self.composite_method:
self.run_opt_job(label, fine=self.fine_only)
else:
self.run_composite_job(label)
else:
# The conformer is wrong, and changing the dihedral resulted in a non-isomorphic species.
self.output[label]['errors'] += f'A lower conformer was found for {label} via a torsion mode, ' \
f'but it is not isomorphic with the 2D graph representation ' \
f'{self.species_dict[label].mol.copy(deep=True).to_smiles()}. ' \
f'Not calculating this species.'
self.output[label]['conformers'] += 'Unconverged'
self.output[label]['convergence'] = False
else:
logger.error(f'Directed scan for species {label} for pivots {pivots} failed with: '
f'{invalidation_reason}. Currently rotor troubleshooting methods do not apply for '
f'directed scans. Not troubleshooting rotor.')
for rotor_dict in self.species_dict[label].rotors_dict.values():
if rotor_dict['pivots'] == pivots:
rotor_dict['scan_path'] = ''
rotor_dict['invalidation_reason'] = invalidation_reason
rotor_dict['success'] = False
else:
# the rotor scan is good, calculate the symmetry number
for rotor_dict in self.species_dict[label].rotors_dict.values():
if rotor_dict['pivots'] == pivots:
if not invalidate:
rotor_dict['success'] = True
rotor_dict['symmetry'] = determine_rotor_symmetry(label=label,
pivots=rotor_dict['pivots'],
energies=energies)[0]
logger.info(f'Rotor scan {scan} between pivots {pivots} for {label} has symmetry '
f'{rotor_dict["symmetry"]}')
else:
rotor_dict['success'] = False
# Save the restart dictionary
self.save_restart_dict()
[docs] def check_directed_scan_job(self, label: str, job: 'JobAdapter'):
"""
Check that a directed scan job for a specific dihedral angle converged successfully, otherwise troubleshoot.
Args:
label (str): The species label.
job (JobAdapter): The rotor scan job object.
"""
if job.job_status[1]['status'] == 'done':
xyz = parser.parse_geometry(path=job.local_path_to_output_file)
is_isomorphic = self.species_dict[label].check_xyz_isomorphism(xyz=xyz, verbose=False)
for rotor_dict in self.species_dict[label].rotors_dict.values():
if rotor_dict['pivots'] == job.pivots:
key = tuple(f'{dihedral:.2f}' for dihedral in job.dihedrals)
rotor_dict['directed_scan'][key] = {'energy': parser.parse_e_elect(
path=job.local_path_to_output_file),
'xyz': xyz,
'is_isomorphic': is_isomorphic,
'trsh': job.ess_trsh_methods,
}
else:
self.troubleshoot_ess(label=label,
job=job,
level_of_theory=self.scan_level)
[docs] def check_all_done(self, label: str):
"""
Check that we have all required data for the species/TS.
Args:
label (str): The species label.
"""
all_converged = True
if label in self.output and not self.output[label]['convergence']:
for job_type, spawn_job_type in self.job_types.items():
if spawn_job_type and not self.output[label]['job_types'][job_type] \
and not ((self.species_dict[label].is_ts and job_type in ['scan', 'conf_opt'])
or (self.species_dict[label].number_of_atoms == 1
and job_type in ['conf_opt', 'opt', 'fine', 'freq', 'rotors', 'bde'])
or job_type == 'bde' and self.species_dict[label].bdes is None
or job_type == 'conf_opt'
or job_type == 'irc'
or job_type == 'tsg'):
logger.debug(f'Species {label} did not converge.')
all_converged = False
break
if label in self.output and all_converged:
self.output[label]['convergence'] = True
if self.species_dict[label].is_ts:
self.species_dict[label].make_ts_report()
logger.info(self.species_dict[label].ts_report + '\n')
zero_delta = datetime.timedelta(0)
conf_time = extremum_list([job.run_time for job in self.job_dict[label]['conf_opt'].values()],
return_min=False) \
if 'conf_opt' in self.job_dict[label].keys() else zero_delta
conf_time = conf_time + extremum_list([job.run_time for job in self.job_dict[label]['conf_sp'].values()],
return_min=False) \
if 'conf_sp' in self.job_dict[label].keys() else zero_delta
tsg_time = extremum_list([job.run_time for job in self.job_dict[label]['tsg'].values()], return_min=False) \
if 'tsg' in self.job_dict[label].keys() else zero_delta
opt_time = sum_time_delta([job.run_time for job in self.job_dict[label]['opt'].values()]) \
if 'opt' in self.job_dict[label].keys() else zero_delta
comp_time = sum_time_delta([job.run_time for job in self.job_dict[label]['composite'].values()]) \
if 'composite' in self.job_dict[label].keys() else zero_delta
other_time = extremum_list([sum_time_delta([job.run_time for job in job_dictionary.values()])
for job_type, job_dictionary in self.job_dict[label].items()
if job_type not in ['conf_opt', 'conf_sp', 'opt', 'composite']], return_min=False) \
if any([job_type not in ['conf_opt', 'conf_sp', 'opt', 'composite']
for job_type in self.job_dict[label].keys()]) else zero_delta
self.species_dict[label].run_time = self.species_dict[label].run_time \
or (conf_time or zero_delta) + \
(tsg_time or zero_delta) + \
(opt_time or zero_delta) + \
(comp_time or zero_delta) + \
(other_time or zero_delta)
logger.info(f'\nAll jobs for species {label} successfully converged. '
f'Run time: {self.species_dict[label].run_time}')
# Todo: any TS which did not converged (any rxn not calculated) should be reported here with full status: Was the family identified? Were TS guesses found? IF so, what's wrong?
elif label in self.species_dict and (not self.species_dict[label].is_ts or self.species_dict[label].ts_guesses_exhausted) and not label.startswith('IRC_'):
job_type_status = {key: val for key, val in self.output[label]['job_types'].items()
if key in self.job_types and self.job_types[key]
and (key != 'irc' or self.species_dict[label].is_ts)}
logger.error(f'Species {label} did not converge. Job type status is: {job_type_status}')
# Update restart dictionary and save the yaml restart file:
self.save_restart_dict()
[docs] def get_server_job_ids(self, specific_server: Optional[str] = None):
"""
Check job status on a specific server or on all active servers, get a list of relevant running job IDs.
Args:
specific_server (str, optional): The server to check. If ``None``, check all active servers.
"""
self.server_job_ids = list()
for server in self.servers:
if specific_server is None or server == specific_server:
if server != 'local':
with SSHClient(server) as ssh:
self.server_job_ids.extend(ssh.check_running_jobs_ids())
else:
self.server_job_ids.extend(check_running_jobs_ids())
[docs] def get_completed_incore_jobs(self):
"""
Check job status of all incore jobs, get a list of relevant completed job IDs.
Todo: Add tests.
"""
self.completed_incore_jobs = list()
for label, job_names in self.running_jobs.items():
for job_name in job_names:
i = get_i_from_job_name(job_name)
if i is None:
job_type = '_'.join(job_name.split('_')[:-1]) # Consider job types such as 'directed_scan'.
job = self.job_dict[label][job_type][job_name]
elif 'conf_opt' in job_name:
job = self.job_dict[label]['conf_opt'][i]
elif 'conf_sp' in job_name:
job = self.job_dict[label]['conf_sp'][i]
elif 'tsg' in job_name:
job = self.job_dict[label]['tsg'][i]
else:
raise ValueError(f'Did not recognize job {job_name} of species {label}.')
if job.execution_type == 'incore' and job.job_status[0] == 'done':
self.completed_incore_jobs.append(job.job_id)
[docs] def troubleshoot_negative_freq(self,
label: str,
job: 'JobAdapter',
):
"""
Troubleshooting cases where non-TS species have negative frequencies.
Run newly generated conformers.
Args:
label (str): The species label.
job (JobAdapter): The frequency job object.
"""
if not self.trsh_ess_jobs:
logger.warning(f'Not troubleshooting negative freq for {label} and job {job.job_name}. '
f'To enable troubleshooting, set the "trsh_ess_jobs" to "True".')
return None
current_neg_freqs_trshed, confs, output_errors, output_warnings = trsh_negative_freq(
label=label, log_file=job.local_path_to_output_file,
neg_freqs_trshed=self.species_dict[label].neg_freqs_trshed, job_types=self.job_types)
self.species_dict[label].neg_freqs_trshed.extend(current_neg_freqs_trshed)
for output_error in output_errors:
self.output[label]['errors'] += output_error
if 'Invalidating species' in output_error:
logger.info(f'Deleting all currently running jobs for species {label}...')
self.delete_all_species_jobs(label)
self.output[label]['convergence'] = False
for output_warning in output_warnings:
self.output[label]['warnings'] += output_warning
if len(confs):
logger.info(f'Deleting all currently running jobs for species {label} before troubleshooting for '
f'negative frequency with perturbed conformers...')
logger.info(f'conformers:')
self.delete_all_species_jobs(label)
self.species_dict[label].conformers = confs
self.species_dict[label].conformer_energies = [None] * len(confs)
self.job_dict[label]['conf_opt'] = dict() # initialize the conformer job dictionary
for i, xyz in enumerate(self.species_dict[label].conformers):
self.run_job(label=label,
xyz=xyz,
level_of_theory=self.conformer_opt_level,
job_type='conf_opt',
conformer=i,
)
[docs] def troubleshoot_scan_job(self,
job: 'JobAdapter',
methods: Optional[dict] = None,
) -> Tuple[bool, dict]:
"""
Troubleshooting rotor scans
Using the following methods:
1. freeze: freezing specific internal coordinates or all torsions other than the scan's pivots
2. inc_res: increasing the scan resolution.
3. change conformer: changing to a conformer with a lower energy
Args:
job (JobAdapter): The scan Job object.
methods (dict): The troubleshooting method/s to try::
{'freeze': <a list of problematic internal coordinates>,
'inc_res': ``None``,
'change conformer': <a xyz dict>}
Returns: Tuple[bool, dict]:
- ``True`` if the troubleshooting is valid.
- The actions are applied in the troubleshooting.
"""
if not self.trsh_ess_jobs:
logger.warning(f'Not troubleshooting failed scan job {job.job_name}. To enable troubleshooting, '
f'set the "trsh_ess_jobs" to "True".')
return False, dict()
label = job.species_label
trsh_success = False
actual_actions = dict() # If troubleshooting fails, there will be no action.
used_trsh_methods = self.species_dict[label].rotors_dict[job.rotor_index]['trsh_methods'] \
if job.rotor_index in self.species_dict[label].rotors_dict else list()
# Check trsh_counter to avoid infinite rotor trsh looping.
if self.species_dict[label].rotors_dict[job.rotor_index]['trsh_counter'] >= max_rotor_trsh:
next_with_ordinal = get_number_with_ordinal_indicator(self.species_dict[label].rotors_dict[job.rotor_index]['trsh_counter'] + 1)
logger.error(f"The rotor {self.species_dict[label].rotors_dict[job.rotor_index]['pivots']} of species "
f"{label} was troubleshooted for "
f"{self.species_dict[label].rotors_dict[job.rotor_index]['trsh_counter']} times, "
f"will not troubleshoot for the {next_with_ordinal} time.")
return trsh_success, actual_actions
# Increase the trsh_counter.
self.species_dict[label].rotors_dict[job.rotor_index]['trsh_counter'] += 1
# A lower conformation was found.
if 'change conformer' in methods:
# We will delete all of the jobs no matter we can successfully change to the conformer.
# If success, we have to cancel jobs to avoid conflicts
# If not succeed, we are in a situation that we find a lower conformer, but either
# this is an incorrect conformer or we have applied this troubleshooting before, but it
# didn't yield a good result.
self.delete_all_species_jobs(label)
new_xyz = methods['change conformer']
# Check if the same conformer is used in previous troubleshooting
for used_trsh_method in used_trsh_methods:
if 'change conformer' in used_trsh_method \
and compare_confs(new_xyz, used_trsh_method['change conformer']):
# Find we have used this conformer for troubleshooting. Invalid the troubleshooting.
logger.error(f'The change conformer method for {label} is invalid. '
f'ARC will not change to the same conformer twice.')
break
else:
# If 'change conformer' is not used, check for isomorphism.
is_isomorphic = self.species_dict[label].check_xyz_isomorphism(
allow_nonisomorphic_2d=self.allow_nonisomorphic_2d,
xyz=new_xyz)
if is_isomorphic:
self.species_dict[label].final_xyz = new_xyz
# Remove all completed rotor calculation information.
for rotor in self.species_dict[label].rotors_dict.values():
# Don't initialize all parameters, e.g., `times_dihedral_set` needs to remain as is.
rotor['scan_path'] = ''
rotor['invalidation_reason'] = ''
rotor['success'] = None
rotor['symmetry'] = None
if rotor['scan'] == torsions_to_scans(job.torsions)[0]:
rotor['times_dihedral_set'] += 1
# We can save the change conformer trsh info, but other trsh methods like
# freezing or increasing scan resolution can be cleaned, otherwise, they may
# not be troubleshot.
rotor['trsh_methods'] = [trsh_method for trsh_method in rotor['trsh_methods']
if 'change conformer' in trsh_method]
# Re-run opt (or composite) on the new initial_xyz with the desired dihedral.
if not self.composite_method:
self.run_opt_job(label)
else:
self.run_composite_job(label)
trsh_success = True
actual_actions = methods
return trsh_success, actual_actions
# The conformer is wrong, or we are in a loop changing to the same conformers again.
self.output[label]['errors'] += \
f'A lower conformer was found for {label} via a torsion mode, ' \
f'but it is not isomorphic with the 2D graph representation ' \
f'{self.species_dict[label].mol.copy(deep=True).to_smiles()}. ' \
f'Not calculating this species.'
self.output[label]['conformers'] += 'Unconverged'
self.output[label]['convergence'] = False
else:
# Get the scan_list, useful for freezing or increasing the scan resolution.
scan_list = [rotor_dict['scan'] for rotor_dict in
self.species_dict[label].rotors_dict.values()]
try:
scan_trsh, scan_res = trsh_scan_job(label=label,
scan_res=job.scan_res,
scan=torsions_to_scans(job.torsions)[0],
scan_list=scan_list,
methods=methods,
log_file=job.local_path_to_output_file,
)
except TrshError as e:
logger.error(f'Troubleshooting of the rotor scan of pivots '
f'{self.species_dict[label].rotors_dict[job.rotor_index]["pivots"]} for '
f'{label} failed. Got:\n{e}\nJob info:\n{job}')
except InputError as e:
logger.debug(f'Got invalid input for trsh_scan_job: {e}\nJob info:\n{job}')
else:
if scan_trsh or job.scan_res != scan_res:
for action in used_trsh_methods:
if isinstance(action, dict) and 'scan_trsh' in action and 'scan_res' in action \
and action['scan_trsh'] == scan_trsh and action['scan_res'] == scan_res:
break
else:
# Valid troubleshooting method for freezing or increasing resolution.
trsh_success = True
actual_actions = {'scan_trsh': scan_trsh, 'scan_res': scan_res}
self.run_job(label=label,
xyz=job.xyz,
level_of_theory=job.level,
job_type='scan',
torsions=job.torsions,
scan_trsh=scan_trsh,
trsh={'scan_res': scan_res} if scan_res is not None else None,
rotor_index=job.rotor_index,
)
return trsh_success, actual_actions
[docs] def troubleshoot_opt_jobs(self, label):
"""
We're troubleshooting for opt jobs.
First check for server status and troubleshoot if needed. Then check for ESS status and troubleshoot
if needed. Finally, check whether the last job had fine=True, add if it didn't run with fine.
Args:
label (str): The species label.
"""
if not self.trsh_ess_jobs:
logger.warning(f'Not troubleshooting failed opt job for {label}. To enable troubleshooting, set the '
f'"trsh_ess_jobs" to "True".')
return None
previous_job_num, latest_job_num = -1, -1
job = None
for job_name in self.job_dict[label]['opt'].keys(): # get the latest Job object for the species / TS
job_name_int = int(job_name[5:])
if job_name_int > latest_job_num:
previous_job_num = latest_job_num
latest_job_num = job_name_int
job = self.job_dict[label]['opt'][job_name]
if job.job_status[0] == 'done':
if job.job_status[1]['status'] == 'done':
if job.fine:
# run_opt_job should not be called if all looks good...
logger.error(f'opt job for {label} seems right, yet "run_opt_job" was called.')
raise SchedulerError(f'opt job for {label} seems right, yet "run_opt_job" was called.')
else:
# Run opt again using a finer grid.
self.parse_opt_geo(label=label, job=job)
xyz = self.species_dict[label].final_xyz
self.species_dict[label].initial_xyz = xyz # save for troubleshooting, since trsh goes by initial
self.run_job(label=label,
xyz=xyz,
level_of_theory=self.opt_level,
job_type='opt',
fine=True,
)
else:
trsh_opt = True
# job passed on the server, but failed in ESS calculation
if previous_job_num >= 0 and job.fine:
previous_job = self.job_dict[label]['opt']['opt_a' + str(previous_job_num)]
if not previous_job.fine and previous_job.job_status[0] == 'done' \
and previous_job.job_status[1]['status'] == 'done' \
and 'all_attempted' in job.ess_trsh_methods:
# The present job with a fine grid failed in the ESS calculation.
# A *previous* job without a fine grid terminated successfully on the server and ESS.
# So use the xyz determined w/o the fine grid, and output an error message to alert users.
logger.error(f'Optimization job for {label} with a fine grid terminated successfully '
f'on the server, but crashed during calculation after troubleshooting. NOT running with fine '
f'grid again.')
self.parse_opt_geo(label=label, job=previous_job)
trsh_opt = False
if trsh_opt:
self.troubleshoot_ess(label=label,
job=job,
level_of_theory=self.opt_level)
else:
job.troubleshoot_server()
[docs] def troubleshoot_ess(self,
label: str,
job: 'JobAdapter',
level_of_theory: Union[Level, dict, str],
conformer: Optional[int] = None,
):
"""
Troubleshoot issues related to the electronic structure software, such as conversion.
Args:
label (str): The species label.
job (JobAdapter): The job object to troubleshoot.
level_of_theory (Level, dict, str): The level of theory to use.
conformer (int, optional): The conformer index.
"""
if not self.trsh_ess_jobs:
logger.warning(f'Not troubleshooting failed {label} job {job.job_name}. '
f'To enable troubleshooting, set the "trsh_ess_jobs" argument to "True".')
return None
level_of_theory = Level(repr=level_of_theory)
logger.info('\n')
warning_message = f'Troubleshooting {label} job {job.job_name} which failed'
if job.job_status[1]["status"] and job.job_status[1]["status"] != 'done':
warning_message += f' with status: "{job.job_status[1]["status"]},"'
if job.job_status[1]["keywords"]:
warning_message += f'\nwith keywords: {job.job_status[1]["keywords"]}'
warning_message += f' in {job.job_adapter}. '
if {job.job_status[1]["error"]} and job.job_status[1]["line"]:
warning_message += f'The error "{job.job_status[1]["error"]}" was derived from the following line in the ' \
f'log file:\n"{job.job_status[1]["line"]}".'
logger.warning(warning_message)
if self.species_dict[label].is_ts and conformer is not None:
xyz = self.species_dict[label].ts_guesses[conformer].get_xyz()
elif conformer is not None:
xyz = self.species_dict[label].conformers[conformer]
else:
xyz = self.species_dict[label].final_xyz or self.species_dict[label].initial_xyz
if 'Unknown' in job.job_status[1]['keywords'] and 'change_node' not in job.ess_trsh_methods:
job.ess_trsh_methods.append('change_node')
job.troubleshoot_server()
if job.job_name not in self.running_jobs[label]:
self.running_jobs[label].append(job.job_name) # mark as a running job
if job.job_adapter == 'gaussian':
if self.species_dict[label].checkfile is None:
self.species_dict[label].checkfile = job.checkfile
# Determine if the species is a hydrogen atom (or its isotope).
is_h = self.species_dict[label].number_of_atoms == 1 and \
self.species_dict[label].mol.atoms[0].element.symbol in ['H', 'D', 'T']
output_errors, ess_trsh_methods, remove_checkfile, level_of_theory, \
software, job_type, fine, trsh_keyword, memory, shift, cpu_cores, couldnt_trsh = \
trsh_ess_job(label=label,
level_of_theory=level_of_theory,
server=job.server,
job_status=job.job_status[1],
is_h=is_h,
job_type=job.job_type,
num_heavy_atoms=self.species_dict[label].number_of_heavy_atoms,
software=job.job_adapter,
fine=job.fine,
memory_gb=job.job_memory_gb,
cpu_cores=job.cpu_cores,
ess_trsh_methods=job.ess_trsh_methods,
)
for output_error in output_errors:
self.output[label]['errors'] += output_error
if 'Could not troubleshoot' in output_error and 'tsg' in job.job_name:
self.species_dict[label].ts_guesses[get_i_from_job_name(job.job_name)].errors += f'; {output_error}'
if remove_checkfile:
self.species_dict[label].checkfile = None
job.ess_trsh_methods = ess_trsh_methods
if not couldnt_trsh:
self.run_job(label=label,
xyz=xyz,
level_of_theory=level_of_theory,
job_adapter=software,
memory=memory,
job_type=job_type,
fine=fine,
ess_trsh_methods=ess_trsh_methods,
trsh=trsh_keyword,
conformer=conformer,
torsions=job.torsions,
dihedrals=job.dihedrals,
directed_scan_type=job.directed_scan_type,
rotor_index=job.rotor_index,
cpu_cores=cpu_cores,
shift=shift,
)
elif self.species_dict[label].is_ts and not self.species_dict[label].ts_guesses_exhausted:
logger.info(f'TS {label} did not converge. '
f'Status is:\n{self.species_dict[label].ts_checks}\n'
f'Searching for a better TS conformer...')
self.switch_ts(label=label)
self.save_restart_dict()
[docs] def delete_all_species_jobs(self, label: str):
"""
Delete all jobs of a species/TS.
Args:
label (str): The species label.
"""
logger.debug(f'Deleting all jobs for species {label}')
for value in self.job_dict[label].values():
if value in ['conf_opt', 'tsg']:
for job_name, job in self.job_dict[label][value].items():
if label in self.running_jobs.keys() and job_name in self.running_jobs[label] \
and job.execution_type != 'incore':
logger.info(f'Deleted job {value}{job_name}')
job.delete()
for job_name, job in value.items():
if label in self.running_jobs.keys() and job_name in self.running_jobs[label] \
and job.execution_type != 'incore':
logger.info(f'Deleted job {job_name}')
job.delete()
self.running_jobs[label] = list()
self.output[label]['paths'] = {key: '' if key != 'irc' else list() for key in self.output[label]['paths'].keys()}
[docs] def restore_running_jobs(self):
"""
Make Job objects for jobs which were running in the previous session.
Important for the restart feature so long jobs won't run twice.
"""
jobs = self.restart_dict['running_jobs']
if not jobs or not any([job for job in jobs.values()]):
del self.restart_dict['running_jobs']
self.running_jobs = dict()
logger.debug('It seems that there are no running jobs specified in the ARC restart file. '
'Assuming all jobs have finished.')
else:
for spc_label in jobs.keys():
if spc_label not in self.running_jobs.keys():
self.running_jobs[spc_label] = list()
for job_description in jobs[spc_label]:
if ('conformer' not in job_description or job_description['conformer'] is None) \
and ('tsg' not in job_description or job_description['tsg'] is None):
self.running_jobs[spc_label].append(job_description['job_name'])
elif 'conformer' in job_description:
self.running_jobs[spc_label].append(f'conformer{job_description["conformer"]}')
elif 'tsg' in job_description:
self.running_jobs[spc_label].append(f'tsg{job_description["tsg"]}')
for species in self.species_list:
if species.label == spc_label:
break
else:
raise SchedulerError(f'Could not find species {spc_label} in the restart file')
job_description['species'] = [self.species_dict[label] for label in job_description['species_labels']] \
if 'species_labels' in job_description else None
if 'species_labels' in job_description:
del job_description['species_labels']
job_description['reactions'] = [self.rxn_dict[i] for i in job_description['reaction_indices']] \
if 'reaction_indices' in job_description else None
if 'reaction_indices' in job_description:
del job_description['reaction_indices']
job = job_factory(**job_description)
if spc_label not in self.job_dict.keys():
self.job_dict[spc_label] = dict()
if job_description['job_type'] not in self.job_dict[spc_label].keys():
if ('conformer' not in job_description or job_description['conformer'] is None) \
and ('tsg' not in job_description or job_description['tsg'] is None):
self.job_dict[spc_label][job_description['job_type']] = dict()
elif 'conf_opt' not in self.job_dict[spc_label].keys():
self.job_dict[spc_label]['conf_opt'] = dict()
elif 'tsg' not in self.job_dict[spc_label].keys():
self.job_dict[spc_label]['tsg'] = dict()
if ('conformer' not in job_description or job_description['conformer'] is None) \
and ('tsg' not in job_description or job_description['tsg'] is None):
self.job_dict[spc_label][job_description['job_type']][job_description['job_name']] = job
elif 'conformer' in job_description and job_description['conformer'] is not None:
if 'conf_opt' not in self.job_dict[spc_label].keys():
self.job_dict[spc_label]['conf_opt'] = dict()
self.job_dict[spc_label]['conf_opt'][int(job_description['conformer'])] = job
# don't generate additional conformers for this species
self.dont_gen_confs.append(spc_label)
elif 'tsg' in job_description and job_description['tsg'] is not None:
if 'tsg' not in self.job_dict[spc_label].keys():
self.job_dict[spc_label]['tsg'] = dict()
self.job_dict[spc_label]['tsg'][int(job_description['tsg'])] = job
self.server_job_ids.append(job.job_id)
if self.job_dict:
content = 'Restarting ARC, tracking the following jobs spawned in a previous session:'
for spc_label in self.job_dict.keys():
content += f'\n{spc_label}: '
for job_type in self.job_dict[spc_label].keys():
for job_name in self.job_dict[spc_label][job_type].keys():
if job_type not in ['conf_opt', 'conf_sp', 'tsg']:
content += job_name + ', '
elif 'conf_' in job_type:
content += self.job_dict[spc_label][job_type][job_name].job_name \
+ f' (conformer{job_name}), '
elif job_type == 'tsg':
content += self.job_dict[spc_label][job_type][job_name].job_name \
+ f' (tsg{job_name}), '
content += '\n\n'
logger.info(content)
[docs] def save_restart_dict(self):
"""
Update the restart_dict and save the restart.yml file.
"""
if self.save_restart and self.restart_dict is not None:
logger.debug('Creating a restart file...')
self.restart_dict['output'] = self.output
self.restart_dict['output_multi_spc'] = self.output_multi_spc
self.restart_dict['species'] = [spc.as_dict() for spc in self.species_dict.values()]
self.restart_dict['running_jobs'] = dict()
for spc in self.species_dict.values():
if spc.label in self.running_jobs:
self.restart_dict['running_jobs'][spc.label] = \
[self.job_dict[spc.label][job_name.rsplit('_', 1)[0]][job_name].as_dict()
for job_name in self.running_jobs[spc.label]
if all(x not in job_name for x in ['conf_opt', 'conf_sp', 'tsg'])] \
+ [self.job_dict[spc.label]['conf_opt'][get_i_from_job_name(job_name)].as_dict()
for job_name in self.running_jobs[spc.label] if 'conf_opt' in job_name] \
+ [self.job_dict[spc.label]['conf_sp'][get_i_from_job_name(job_name)].as_dict()
for job_name in self.running_jobs[spc.label] if 'conf_sp' in job_name] \
+ [self.job_dict[spc.label]['tsg'][get_i_from_job_name(job_name)].as_dict()
for job_name in self.running_jobs[spc.label] if 'tsg' in job_name]
logger.debug(f'Dumping restart dictionary:\n{self.restart_dict}')
save_yaml_file(path=self.restart_path, content=self.restart_dict)
[docs] def make_reaction_labels_info_file(self):
"""
A helper function for creating the `reactions labels.info` file.
"""
rxn_info_path = os.path.join(self.project_directory, 'output', 'rxns', 'reaction labels.info')
old_file_path = os.path.join(os.path.join(self.project_directory, 'output', 'rxns', 'reaction labels.old.info'))
if os.path.isfile(rxn_info_path):
if os.path.isfile(old_file_path):
os.remove(old_file_path)
shutil.copy(rxn_info_path, old_file_path)
os.remove(rxn_info_path)
if not os.path.exists(os.path.dirname(rxn_info_path)):
os.makedirs(os.path.dirname(rxn_info_path))
with open(rxn_info_path, 'w') as f:
f.write(str('Reaction labels and respective TS labels:\n\n'))
return rxn_info_path
[docs] def determine_adaptive_level(self,
original_level_of_theory: Level,
job_type: str,
heavy_atoms: int,
) -> Level:
"""
Determine the level of theory to be used according to the job type and number of heavy atoms.
self.adaptive_levels is a dictionary of levels of theory for ranges of the number of heavy atoms in the
species. Keys are tuples of (min_num_atoms, max_num_atoms), values are dictionaries with job type tuples
as keys and levels of theory as values. The string 'inf' is accepted instead of an integer in max_num_atoms.
Args:
original_level_of_theory (Level): The level of theory for non-sp/opt/freq job types.
job_type (str): The job type for which the level of theory is determined.
heavy_atoms (int): The number of heavy atoms in the species.
"""
for atom_range, adaptive_level in self.adaptive_levels.items():
if atom_range[1] == 'inf' and heavy_atoms >= atom_range[0] or atom_range[0] <= heavy_atoms <= atom_range[1]:
break
else:
raise SchedulerError(f'Could not determine adaptive level of theory for {heavy_atoms} heavy atoms using '
f'the following adaptive levels:\n{self.adaptive_levels}')
for job_type_tuple, level in adaptive_level.items():
if job_type in job_type_tuple:
return level
# for any other job type use the original level of theory regardless of the number of heavy atoms
return original_level_of_theory
[docs] def initialize_output_dict(self, label: Optional[str] = None):
"""
Initialize self.output.
Do not initialize keys that will contain paths ('geo', 'freq', 'sp', 'composite'),
their existence indicate the job was terminated for restarting purposes.
If ``label`` is not ``None``, will initialize for a specific species, otherwise will initialize for all species.
Args:
label (str, optional): A species label.
"""
if label is not None or not self._does_output_dict_contain_info():
for species in self.species_list:
if label is None or species.label == label:
if species.label not in self.output:
self.output[species.label] = dict()
if species.multi_species not in self.output_multi_spc:
self.output_multi_spc[species.multi_species] = dict()
if 'paths' not in self.output[species.label]:
self.output[species.label]['paths'] = dict()
path_keys = ['geo', 'freq', 'sp', 'composite']
for key in path_keys:
if key not in self.output[species.label]['paths']:
self.output[species.label]['paths'][key] = ''
if 'irc' not in self.output[species.label]['paths'] and species.is_ts:
self.output[species.label]['paths']['irc'] = list()
if 'job_types' not in self.output[species.label]:
self.output[species.label]['job_types'] = dict()
for job_type in list(set(self.job_types.keys())) + ['opt', 'freq', 'sp', 'composite', 'onedmin']:
if job_type in ['rotors', 'bde']:
# rotors could be invalidated due to many reasons,
# also could be falsely identified in a species that has no torsional modes.
self.output[species.label]['job_types'][job_type] = True
else:
self.output[species.label]['job_types'][job_type] = False
keys = ['conformers', 'isomorphism', 'convergence', 'restart', 'errors', 'warnings', 'info']
for key in keys:
if key not in self.output[species.label]:
if key == 'convergence':
self.output[species.label][key] = None
else:
self.output[species.label][key] = ''
def _does_output_dict_contain_info(self):
"""
Determine whether self.output contains any information other than the initialized structure.
Returns:
bool: Whether self.output contains any information, ``True`` if it does.
"""
for species_output_dict in self.output.values():
for key0, val0 in species_output_dict.items():
if key0 in ['paths', 'job_types']:
for key1, val1 in species_output_dict[key0].items():
if val1 and key1 not in ['rotors', 'bde']:
return True
else:
if val0:
return True
return False
[docs] def generate_final_ts_guess_report(self):
"""
Generate a TS report for this ARC project and saves it as a YAML file.
"""
content = dict()
for species in self.species_dict.values():
if species.is_ts:
ts_dict = dict()
ts_dict['multiplicity'] = species.multiplicity
ts_dict['charge'] = species.charge
ts_dict['external_symmetry'] = species.external_symmetry
ts_dict['optical_isomers'] = species.optical_isomers
ts_dict['run_time'] = str(species.run_time)
ts_dict['successful_methods'] = species.successful_methods
ts_dict['unsuccessful_methods'] = species.unsuccessful_methods
ts_dict['chosen_ts'] = species.chosen_ts
ts_dict['chosen_ts_list'] = species.chosen_ts_list
ts_dict['ts_guesses_exhausted'] = species.ts_guesses_exhausted
ts_dict['ts_report'] = species.ts_report
ts_dict['rxn_label'] = species.rxn_label
ts_dict['rxn_index'] = species.rxn_index
for reaction in self.rxn_list:
if reaction.ts_label == species.label:
ts_dict['family'] = reaction.family.label if reaction.family is not None else None
break
else:
ts_dict['family'] = None
ts_guesses = dict()
for tsg in species.ts_guesses:
ts_guesses[tsg.index] = tsg.as_dict(for_report=True)
ts_dict['ts_guesses'] = ts_guesses
content[species.label] = ts_dict
tsg_fig_path = os.path.join(self.project_directory, 'output', 'rxns', species.label, 'ts_guesses.png')
plotter.plot_ts_guesses_by_e_and_method(species=species, path=tsg_fig_path)
path = os.path.join(self.project_directory, 'output', 'rxns', 'TS_guess_report.yml')
if content:
save_yaml_file(path=path, content=content)
[docs] def save_e_elect(self, label: str):
"""
Save the electronic energy of the corresponding species.
It will append if the file already exists.
"""
path = os.path.join(self.project_directory, 'output', 'e_elect_summary.yml')
content = dict()
if os.path.isfile(path):
content = read_yaml_file(path)
content[label] = self.species_dict[label].e_elect
save_yaml_file(path=path, content=content)
[docs] def check_max_simultaneous_jobs_limit(self, server: Optional[str]):
"""
Check if the number of running jobs on the server is not above the set server limit.
Args:
server (str): The server name.
"""
if server is not None and 'max_simultaneous_jobs' in servers_dict[server]:
continue_lopping = True
while continue_lopping:
self.get_server_job_ids(specific_server=server)
if len(self.server_job_ids) >= servers_dict[server]['max_simultaneous_jobs']:
time.sleep(90)
else:
continue_lopping = False
self.get_server_job_ids()
[docs]def species_has_freq(species_output_dict: dict,
yml_path: Optional[str] = None,
) -> bool:
"""
Checks whether a species has valid converged frequencies using it's output dict.
Args:
species_output_dict (dict): The species output dict (i.e., Scheduler.output[label]).
yml_path (str): THe species Arkane YAML file path.
Returns: bool
Whether a species has valid converged frequencies.
"""
if yml_path is not None:
return True
if species_output_dict['paths']['freq'] or species_output_dict['paths']['composite']:
return True
return False
[docs]def species_has_geo(species_output_dict: dict,
yml_path: Optional[str] = None,
) -> bool:
"""
Checks whether a species has a valid converged geometry using it's output dict.
Args:
species_output_dict (dict): The species output dict (i.e., Scheduler.output[label]).
yml_path (str): THe species Arkane YAML file path.
Returns: bool
Whether a species has a valid converged geometry.
"""
if yml_path is not None:
return True
if species_output_dict['paths']['geo'] or species_output_dict['paths']['composite']:
return True
return False
[docs]def species_has_sp(species_output_dict: dict,
yml_path: Optional[str] = None,
) -> bool:
"""
Checks whether a species has a valid converged single-point energy using it's output dict.
Args:
species_output_dict (dict): The species output dict (i.e., Scheduler.output[label]).
yml_path (str): THe species Arkane YAML file path.
Returns: bool
Whether a species has a valid converged single-point energy.
"""
if yml_path is not None:
return True
if species_output_dict['paths']['sp'] or species_output_dict['paths']['composite']:
return True
return False
[docs]def species_has_sp_and_freq(species_output_dict: dict,
yml_path: Optional[str] = None,
) -> bool:
"""
Checks whether a species has a valid converged single-point energy and valid converged frequencies.
Args:
species_output_dict (dict): The species output dict (i.e., Scheduler.output[label]).
yml_path (str): THe species Arkane YAML file path.
Returns: bool
Whether a species has a valid converged single-point energy and frequencies.
"""
return species_has_sp(species_output_dict, yml_path) and species_has_freq(species_output_dict, yml_path)