A module for running jobs on the local machine.
import datetime
import os
import re
import shutil
import subprocess
import time
from typing import List, Optional, Tuple, Union

from arc.common import get_logger
from arc.exceptions import SettingsError
from arc.imports import settings
from arc.job.ssh import check_job_status_in_stdout

logger = get_logger()

servers, check_status_command, submit_command, submit_filenames, delete_command, output_filenames = \
    settings['servers'], settings['check_status_command'], settings['submit_command'], settings['submit_filenames'],\
    settings['delete_command'], settings['output_filenames']

[docs]def execute_command(command: Union[str, List[str]], shell: bool = True, no_fail: bool = False, ) -> Tuple[Optional[list], Optional[list]]: """ Execute a command. Notes: If ``no_fail`` is ``True``, then a warning is logged and ``False`` is returned so that the calling function can debug the situation. Args: command (Union[str, List[str]]): An array of string commands to send. shell (bool, optional): Specifies whether the command should be executed using bash instead of Python. no_fail (bool, optional): If ``True`` then ARC will not crash if an error is encountered. Returns: Tuple[list, list]: - A list of lines of standard output stream. - A list of lines of the standard error stream. """ error = None if not isinstance(command, list): command = [command] command = [' && '.join(command)] i, max_times_to_try = 1, 30 sleep_time = 60 # Seconds while i < max_times_to_try: try: completed_process =, shell=shell, capture_output=True) return _format_stdout(completed_process.stdout), _format_stdout(completed_process.stderr) except subprocess.CalledProcessError as e: error = e # Store the error so we can raise a SettingsError if needed. if no_fail: _output_command_error_message(command, e, logger.warning) return None, None else: _output_command_error_message(command, e, logger.error) logger.error(f'ARC is sleeping for {sleep_time * i} seconds before retrying.\nPlease check whether ' f'this is a server issue by executing the command manually on the server.')'ZZZZZ..... ZZZZZ.....') time.sleep(sleep_time * i) # In seconds i += 1 # If unsuccessful: raise SettingsError(f'The command "{command}" is erroneous, got: \n{error}' f'\nThis maybe either a server issue or the command is wrong.' f'\nTo check if this is a server issue, please run the command on server and restart ARC.' f'\nTo correct the command, modify' f'\nTips: use "which" command to locate cluster software commands on server.' f'\nExample: type "which sbatch" on a server running Slurm to find the correct ' f'sbatch path required in the submit_command dictionary.')
def _output_command_error_message(command: List[str], error: subprocess.CalledProcessError, logging_func: Union[logger.warning, logger.error], ) -> None: """ Formats and logs the error message returned from a command at the desired logging level Args: command (List[str]): The command that threw the error. error (subprocess.CalledProcessError): The exception caught by python from subprocess. logging_func: ``logging.warning`` or ``logging.error`` as a function object. """ logging_func('The server command is erroneous.') logging_func(f'Tried to submit the following command:\n{command}') logging_func('And got the following status (cmd, message, output, return code)') logging_func(error.cmd)'\n') logging_func(error)'\n') logging_func(error.output)'\n') logging_func(error.returncode) def _format_stdout(stdout: bytes) -> List[str]: """ Format the stdout as a list of unicode strings Args: stdout (bytes): The standard output. Returns: List(str): The decoded lines from stdout. """ lines, list_of_strs = stdout.splitlines(), list() for line in lines: list_of_strs.append(line.decode()) return list_of_strs
[docs]def check_job_status(job_id: int) -> str: """ Possible status values: ``before_submission``, ``running``, ``errored on node xx``, ``done`` Status line formats: OGE:: 540420 0.45326 xq1340b user_name r 10/26/2018 11:08:30 long1@node18.cluster Slurm:: 14428 debug xq1371m2 user_name R 50-04:04:46 1 node06 PBS (taken from Req'd Req'd Elap Job ID Username Queue Jobname SessID NDS TSK Memory Time S Time ----------------------- ----------- -------- --------------- ------ ----- ------ --------- --------- - --------- 2016614.zeldo.local u780444 workq scan.pbs 75380 1 10 -- 730:00:00 R 00:00:20 2016616.zeldo.local u780444 workq scan.pbs 75380 1 10 -- 730:00:00 R 00:00:20 HTCondor (using ARC's modified condor_q command):: 3261.0 R 10 28161 a2719 56 3263.0 R 10 28161 a2721 23 3268.0 R 10 28161 a2726 18 3269.0 R 10 28161 a2727 17 3270.0 P 10 28161 a2728 23 """ server = 'local' cmd = check_status_command[servers[server]['cluster_soft']] stdout = execute_command(cmd)[0] return check_job_status_in_stdout(job_id=job_id, stdout=stdout, server=server)
[docs]def delete_job(job_id: Union[int, str]): """ Deletes a running job. """ cmd = f"{delete_command[servers['local']['cluster_soft']]} {job_id}" success = not bool(execute_command(cmd, no_fail=True)[1]) if not success: logger.warning(f'Detected possible error when trying to delete job {job_id}. Checking to see if the job is ' f'still running...') running_jobs = check_running_jobs_ids() if job_id in running_jobs: logger.error(f'Job {job_id} was scheduled for deletion, but the deletion command has appeared to errored. ' f'The job is still running.') raise RuntimeError(f'Could not delete job {job_id}') else:'Job {job_id} is no longer running.')
[docs]def check_running_jobs_ids() -> List[str]: """ Check which jobs are still running on the server for this user. Returns: List[str]: List of job IDs. """ cluster_soft = servers['local']['cluster_soft'].lower() if cluster_soft not in ['slurm', 'oge', 'sge', 'pbs', 'htcondor']: raise ValueError(f"Server cluster software {servers['local']['cluster_soft']} is not supported.") cmd = check_status_command[servers['local']['cluster_soft']] stdout = execute_command(cmd)[0] running_job_ids = parse_running_jobs_ids(stdout, cluster_soft=cluster_soft) return running_job_ids
[docs]def parse_running_jobs_ids(stdout: List[str], cluster_soft: Optional[str] = None, ) -> List[str]: """ A helper function for parsing job IDs from the stdout of a job status command. Args: stdout (List[str]): The stdout of a job status command. cluster_soft (Optional[str]): The cluster software. Returns: List(str): List of job IDs. """ cluster_soft = cluster_soft or servers['local']['cluster_soft'].lower() i_dict = {'slurm': 0, 'oge': 1, 'sge': 1, 'pbs': 4, 'htcondor': -1} split_by_dict = {'slurm': ' ', 'oge': ' ', 'sge': ' ', 'pbs': '.', 'htcondor': '.'} running_job_ids = list() for i, status_line in enumerate(stdout): if i > i_dict[cluster_soft]: job_id = status_line.strip().split(split_by_dict[cluster_soft])[0] job_id = f'{job_id}' # job_id is sometimes a byte, this transforms b'bytes' into "b'bytes'" if "b'" in job_id: job_id = job_id.split("b'")[1].split("'")[0] running_job_ids.append(job_id) return running_job_ids
[docs]def submit_job(path: str, ) -> Tuple[Optional[str], Optional[str]]: """ Submit a job. Args: path (str): The job's folder path, where the submit script is located (just the folder path, w/o the filename). Returns: Tuple[Optional[str], Optional[str]]: job_status, job_id """ job_status, job_id = '', '' cmd = f"cd {path}; {submit_command[servers['local']['cluster_soft']]} " \ f"{submit_filenames[servers['local']['cluster_soft']]}" stdout, stderr = execute_command(cmd) if not len(stdout): time.sleep(10) stdout, stderr = execute_command(cmd) if not len(stdout): return None, None if len(stderr) > 0 or len(stdout) == 0: logger.warning(f'Got the following error when trying to submit job:\n{stderr}.') job_status = 'errored' else: job_id = _determine_job_id(stdout=stdout) job_status = 'running' if job_id else job_status return job_status, job_id
def _determine_job_id(stdout: List[str], cluster_soft: Optional[str] = None ) -> str: """ Determine the job ID right after it was submitted from the stdout. Args: stdout (List[str]): The stdout got from submitting a job. """ job_id = '' cluster_soft = cluster_soft or servers['local']['cluster_soft'].lower() if cluster_soft in ['oge', 'sge'] and 'submitted' in stdout[0].lower(): job_id = stdout[0].split()[2] elif cluster_soft == 'slurm' and 'submitted' in stdout[0].lower(): job_id = stdout[0].split()[3] elif cluster_soft == 'pbs': job_id = stdout[0].split('.')[0] elif cluster_soft == 'htcondor' and 'submitting' in stdout[0].lower(): if len(stdout) and len(stdout[1].split()) and len(stdout[1].split()[-1].split('.')): job_id = stdout[1].split()[-1].split('.')[0] else: raise ValueError(f'Unrecognized cluster software: {cluster_soft}') return job_id
[docs]def get_last_modified_time(file_path_1: str, file_path_2: Optional[str] = None, ) -> Optional[datetime.datetime]: """ Returns the last modified time of ``file_path_1`` if the file exists, else returns the last modified time of ``file_path_2`` if the file exists. Args: file_path_1 (str): The path to file 1. file_path_2 (str, optional): The path to file 2. """ timestamp = None if os.path.isfile(file_path_1): try: timestamp = os.stat(file_path_1).st_mtime except (IOError, OSError): pass if timestamp is None and file_path_2 is not None: try: timestamp = os.stat(file_path_2).st_mtime except (IOError, OSError): return None return datetime.datetime.fromtimestamp(timestamp) if timestamp is not None else None
[docs]def write_file(file_path: str, file_string: str) -> None: """ Write ``file_string`` as the file's content in ``file_path``. Args: file_path (str): The file path. file_string (str): The content to be written into the file. """ with open(file_path, 'w') as f: f.write(file_string)
[docs]def rename_output(local_file_path: str, software: str, ) -> None: """ Rename the output file to "output.out" for consistency between software. Args: local_file_path (str): The full path to the output.out file. software (str): The software used for the job by which the original output file name was determined. """ software = software.lower() for i in range(5): if not os.path.isfile(local_file_path) \ and not os.path.isfile(os.path.join(os.path.dirname(local_file_path), output_filenames[software])): # Wait for file to be transferred on the server (the head node might be busy). time.sleep(6) else: break else: # Nothing to rename. return None if os.path.isfile(os.path.join(os.path.dirname(local_file_path), output_filenames[software])): shutil.move(src=os.path.join(os.path.dirname(local_file_path), output_filenames[software]), dst=local_file_path)
[docs]def change_mode(mode: str, file_name: str, recursive: bool = False, path: str = '', ) -> None: """ Change the mode of a file or a directory. Args: mode (str): The mode change to be applied, can be either octal or symbolic. file_name (str): The path to the file or the directory to be changed. recursive (bool, optional): Whether to recursively change the mode to all files under a directory.``True`` for recursively change. path (str, optional): The directory path at which the command will be executed. """ if os.path.isfile(path): path = os.path.dirname(path) recursive = ' -R' if recursive else '' command = [f'cd {path}'] if path else [] command.append(f'chmod{recursive} {mode} {file_name}') execute_command(command=command)
[docs]def delete_all_local_arc_jobs(jobs: Optional[List[Union[str, int]]] = None) -> None: """ Delete all ARC-spawned jobs (with job name starting with `a` and a digit) from the local server. Make sure you know what you're doing, so unrelated jobs won't be deleted... Useful when terminating ARC while some (ghost) jobs are still running. Args: jobs (List[Union[str, int]], optional): Specific ARC job IDs to delete. """ server = 'local' if server in servers: print('\nDeleting all ARC jobs from local server...') cmd = check_status_command[servers[server]['cluster_soft']] stdout = execute_command(cmd, no_fail=True)[0] for status_line in stdout: s =' a\d+', status_line) if s is not None: job_name =[1:] cluster_soft = servers[server]['cluster_soft'].lower() server_job_id = None if jobs is None or job_name in jobs: if cluster_soft == 'slurm': server_job_id = status_line.split()[0] delete_job(server_job_id) elif cluster_soft == 'pbs': server_job_id = status_line.split()[0] delete_job(server_job_id) elif cluster_soft in ['oge', 'sge']: delete_job(job_name) elif cluster_soft == 'htcondor': server_job_id = status_line.split()[0].split('.')[0] delete_job(server_job_id) else: raise ValueError(f'Unrecognized cluster software {cluster_soft}.') aux_text = f' ({server_job_id} on server)' if server_job_id is not None else '' print(f'deleted job {job_name}{aux_text}.') print('\ndone.')