diff --git a/LSF-Script/job/query b/LSF-Script/job/query new file mode 100644 index 0000000000000000000000000000000000000000..75a8ff168161e159889c2ab9ebafabd30fd684ef --- /dev/null +++ b/LSF-Script/job/query @@ -0,0 +1,359 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Copyright (c) Huawei Technologies Co., Ltd. 2023-2023. All rights reserved + +# An example script for query job from LSF cluster. +# Method "main" needs to be declared in this script for job query. + +# expected output format: +""" +{ + "data":{ + "successData":[ + { + "jobId":"1", + "user":"ccp_sysadmin", + "cmd":"date", + "state":"SUCCEEDED", + "queue":"default", + "execPath":"/home/ccs_cli", + "jobName":"st_submit_basic_task_test", + "submitNode":"node1", + "taskExecNodes":"node2", + "description":"TEMPLATE=Generic^TEMPLATE_TYPE=^DISPLAY_FLAG=^SECURITY_LEVEL=0", + "exitCode":0, + "exitMessage":"Job execution succeeded", + "createTime":"2023-11-28T13:02:40Z", + "startTime":"2023-11-28T13:02:41Z", + "endTime":"2023-11-28T13:02:41Z", + "postHook":{ + "cmd": "/share/share_lsf/pre-exec.sh" + }, + "preHook":{ + "cmd": "/share/share_lsf/post-exec.sh" + }, + "logRedirectPath":{ + "stderrRedirectPath":"/home/ccs_cli/error.log", + "stdoutRedirectPath":"/home/ccs_cli/output.log" + }, + "traceMessages":[ + "Tue Nov 28 21:27:28 2023: Submitted from host , to Queue , CWD <$HOME>;", + "Tue Nov 28 21:27:28 2023: Dispatched 1 Task(s) on Host(s) , Allocated 1 Slot(s) on Host(s) , Effective RES_REQ ;", + "Tue Nov 28 21:27:28 2023: Starting (Pid 2333);", + "Tue Nov 28 21:27:28 2023: Running with execution home , Execution CWD , Execution Pid <2333>;", + "Tue Nov 28 21:27:36 2023: Done successfully. The CPU time used is 0.0 seconds;", + "Tue Nov 28 21:27:36 2023: Post job process done successfully;" + ] + }, + { + "jobId":"2", + "user":"ccp_sysadmin", + "cmd":"date", + "state":"SUCCEEDED", + "queue":"default", + "execPath":"/home/ccs_cli", + "jobName":"st_submit_basic_task_test", + "submitNode":"node1", + "taskExecNodes":"node2", + "description":"TEMPLATE=Generic^TEMPLATE_TYPE=^DISPLAY_FLAG=^SECURITY_LEVEL=0", + "exitCode":0, + "exitMessage":"Job execution succeeded", + "createTime":"2023-11-28T13:02:40Z", + "startTime":"2023-11-28T13:02:41Z", + "endTime":"2023-11-28T13:02:41Z", + "postHook":{ + "cmd": "/share/share_lsf/pre-exec.sh" + }, + "preHook":{ + "cmd": "/share/share_lsf/post-exec.sh" + }, + "logRedirectPath":{ + "stderrRedirectPath":"/home/ccs_cli/error.log", + "stdoutRedirectPath":"/home/ccs_cli/output.log" + }, + "traceMessages":[ + "Tue Nov 28 21:27:28 2023: Submitted from host , to Queue , CWD <$HOME>;", + "Tue Nov 28 21:27:28 2023: Dispatched 1 Task(s) on Host(s) , Allocated 1 Slot(s) on Host(s) , Effective RES_REQ ;", + "Tue Nov 28 21:27:28 2023: Starting (Pid 2333);", + "Tue Nov 28 21:27:28 2023: Running with execution home , Execution CWD , Execution Pid <2333>;", + "Tue Nov 28 21:27:36 2023: Done successfully. The CPU time used is 0.0 seconds;", + "Tue Nov 28 21:27:36 2023: Post job process done successfully;" + ] + } + ], + "errorData":{ + "3": "Job <3> is not found", + "4": "No matching job found" + } + }, + "message":"List job detail infos successfully.", + "code":"success" +} +""" + +import json +import os +import sys +import re +from dateutil import parser + +B_JOBS_CMD = 'source @SCHEDULER_PROFILE_PATH@; timeout 10 bjobs -UF {} 2>&1' +B_HIST_CMD = 'source @SCHEDULER_PROFILE_PATH@; timeout 10 bhist -UF {} 2>&1' + +JOB_ID_LIST = list() + +if len(sys.argv) > 1: + _JOB_ID_LIST = sys.argv[1].split(' ') + for job_ori in _JOB_ID_LIST: + if '.' in job_ori: + JOB_ID_LIST.append(job_ori.split('.')[0] + '[' + job_ori.split('.')[1] + ']') + else: + JOB_ID_LIST.append(job_ori) + +SUCCESS = 'successData' +ERROR = 'errorData' + +DATA_MAP = { + 'successData': [], + 'errorData': dict() +} + +# jobStateMap for donau +STATE_MAP = { + 'UNKWN': 'UNRECOGNIZED', + 'RUN': 'RUNNING', + 'PSUSP': 'STOPPED', + 'USUSP': 'STOPPED', + 'SSUSP': 'SSTOPPED', + 'PEND': 'PENDING', + 'DONE': 'SUCCEEDED', + 'EXIT': 'FAILED', + 'WAIT': 'WAITING', + 'ZOMBI': 'UNRECOGNIZED' +} + + +def main(): + result = dict() + if not JOB_ID_LIST: + query_job_info_without_job_id() + else: + for _job_id in JOB_ID_LIST: + query_job_info_with_id(_job_id) + result['message'] = 'List job detail infos successfully.' + result['code'] = 'success' + result['data'] = DATA_MAP + print(json.dumps(result)) + + +def query_job_info_with_id(job_id): + # 先使用bjobs查询,不存在则使用bhost查询。如果bhost依然不存在,则报错。 + if not _query_job_by_bjobs(job_id): + _query_job_by_bhist(job_id) + + +def query_job_info_without_job_id(): + job_info = os.popen(B_JOBS_CMD.format('')).read() + if 'No unfinished job found' in job_info: + return + _parse_job_infos(job_info) + + +def _parse_job_infos(job_info): + job_info_list = list() + job_sep = '------------------------------------------------------------------------------' + if job_sep in job_info: + job_info_list = job_info.split(job_sep) + else: + job_info_list.append(job_info) + for _job_info in job_info_list: + dct = _init_job_data() + for line in _job_info.strip().strip('\n').split('\n'): + # 作业基本信息解析 + if line.startswith('Job '): + for item in line.split(','): + if item.strip().startswith('Job <'): + dct['jobId'] = _safe_get_value('Job <', item) + elif item.strip().startswith('User <'): + dct['user'] = _safe_get_value('User <', item) + elif item.strip().startswith('Job Name <'): + dct['jobName'] = _safe_get_value('Job Name <', item) + elif item.strip().startswith('Queue <'): + dct['queue'] = _safe_get_value('Queue <', item) + elif item.strip().startswith('Command <'): + dct['cmd'] = _safe_get_value('Command <', item) + elif item.strip().startswith('Job Description <'): + dct['description'] = _safe_get_value('Job Description <', item) + elif item.strip().startswith('Status <'): + dct['state'] = STATE_MAP.get(_safe_get_value('Status <', item), 'UNRECOGNIZED') + continue + # 动态信息解析 + if 'Submitted from host ' in line: + dct['submitNode'] = _safe_get_value('Submitted from host <', line) + dct['execPath'] = _safe_get_value('CWD <', line) + output_file = _safe_get_value('Output File <', line) + if output_file: + dct.get('logRedirectPath')['stdoutRedirectPath'] = output_file + error_file = _safe_get_value('Error File <', line) + if error_file: + dct.get('logRedirectPath')['stderrRedirectPath'] = error_file + datetime_struct = parser.parse(line.split(': Submitted from host')[0]) + dct['createTime'] = datetime_struct.strftime('%Y-%m-%dT%H:%M:%SZ') + pre_cmd = _safe_get_value('Pre-execution Command <', line) + if pre_cmd: + dct.get('preHook')['cmd'] = pre_cmd + post_cmd = _safe_get_value('Post-execution Command <', line) + if post_cmd: + dct.get('postHook')['cmd'] = post_cmd + continue + if ': Dispatched ' in line: + dct['taskExecNodes'] = _safe_get_value('on Host\\(s\\) <', line) + if ' started ' in line: + match_str = ' started ' + if '[' in dct['jobId']: + index_val = re.findall(r'(\[.*?\])', dct['jobId'])[0] + match_str = ': ' + index_val + match_str + datetime_struct = parser.parse(line.split(match_str)[0]) + dct['startTime'] = datetime_struct.strftime('%Y-%m-%dT%H:%M:%SZ') + # bjobs查询时会展示作业状态,不需要在此更新job的状态 + if ': Done successfully.' in line: + dct['exitCode'] = 0 + dct['exitMessage'] = 'Done successfully.' + datetime_struct = parser.parse(line.split(': Done successfully.')[0]) + dct['endTime'] = datetime_struct.strftime('%Y-%m-%dT%H:%M:%SZ') + continue + if ': Completed ' in line: + datetime_struct = parser.parse(line.split(': Completed ')[0]) + dct['endTime'] = datetime_struct.strftime('%Y-%m-%dT%H:%M:%SZ') + if ': Completed ;' in line: + dct['exitMessage'] = line.split(': Completed ;')[1] + if 'Exited with exit code' in line: + exit_codes = re.findall(r'Exited with exit code (.*?)\.', line) + if exit_codes: + dct['exitCode'] = int(exit_codes[0]) + dct['traceMessages'] = _job_info.strip().strip("\n").split("\n") + # 适配jobId + if '[' in dct['jobId']: + dct['jobId'] = dct['jobId'].replace('[', '.').replace(']', '') + DATA_MAP.get(SUCCESS).append(dct) + + +def _query_job_by_bjobs(job_id): + job_info = os.popen(B_JOBS_CMD.format(job_id)).read() + if 'Job <{}> is not found'.format(job_id) in job_info: + return False + _parse_job_infos(job_info) + return True + + +def _safe_get_value(key, item): + match = re.findall(r'{}(.*?)>'.format(key), item.strip()) + if match: + return match[0] + return '' + + +def _init_job_data(): + dct = dict() + key_set = ['jobId', 'user', 'cmd', 'state', 'queue', 'execPath', 'jobName', 'submitNode', 'taskExecNodes', + 'description', + 'exitMessage', 'createTime', 'startTime', 'endTime'] + for key in key_set: + dct[key] = '' + dct['exitCode'] = -1 + dct['preHook'] = dict() + dct['postHook'] = dict() + dct['logRedirectPath'] = dict() + dct['traceMessages'] = list() + return dct + + +def _query_job_by_bhist(job_id): + job_info = os.popen(B_HIST_CMD.format(job_id)).read() + if 'No matching job found' in job_info: + DATA_MAP.get(ERROR)[job_id] = job_info.strip().strip('\n') + return + job_info_list = list() + job_sep = '------------------------------------------------------------------------------' + if job_sep in job_info: + job_info_list = job_info.split(job_sep) + else: + job_info_list.append(job_info) + for _job_info in job_info_list: + dct = _init_job_data() + for line in _job_info.strip().strip('\n').split('\n'): + # 作业基本信息解析 + if line.startswith('Job '): + for item in line.split(','): + if item.strip().startswith('Job <'): + dct['jobId'] = _safe_get_value('Job <', item) + elif item.strip().startswith('User <'): + dct['user'] = _safe_get_value('User <', item) + elif item.strip().startswith('Job Name <'): + dct['jobName'] = _safe_get_value('Job Name <', item) + elif item.strip().startswith('Queue <'): + dct['queue'] = _safe_get_value('Queue <', item) + elif item.strip().startswith('Command <'): + dct['cmd'] = _safe_get_value('Command <', item) + elif item.strip().startswith('Job Description <'): + dct['description'] = _safe_get_value('Job Description <', item) + elif item.strip().startswith('Status <'): + dct['state'] = STATE_MAP.get(_safe_get_value('Status <', item), 'UNRECOGNIZED') + continue + # 动态信息解析 + if 'Submitted from host ' in line: + dct['submitNode'] = _safe_get_value('Submitted from host <', line) + dct['execPath'] = _safe_get_value('CWD <', line) + output_file = _safe_get_value('Output File <', line) + if output_file: + dct.get('logRedirectPath')['stdoutRedirectPath'] = output_file + error_file = _safe_get_value('Error File <', line) + if error_file: + dct.get('logRedirectPath')['stderrRedirectPath'] = error_file + datetime_struct = parser.parse(line.split(': Submitted from host')[0]) + dct['createTime'] = datetime_struct.strftime('%Y-%m-%dT%H:%M:%SZ') + pre_cmd = _safe_get_value('Pre-execution Command <', line) + if pre_cmd: + dct.get('preHook')['cmd'] = pre_cmd + post_cmd = _safe_get_value('Post-execution Command <', line) + if post_cmd: + dct.get('postHook')['cmd'] = post_cmd + if ' to Queue <' in line: + dct['queue'] = _safe_get_value(' to Queue <', line) + continue + if ': Dispatched ' in line: + dct['taskExecNodes'] = _safe_get_value('on Host\\(s\\) <', line) + if ': Starting ' in line: + match_str = ': Starting' + datetime_struct = parser.parse(line.split(match_str)[0]) + dct['startTime'] = datetime_struct.strftime('%Y-%m-%dT%H:%M:%SZ') + # bjobs查询时会展示作业状态,不需要在此更新job的状态 + if ': Done successfully.' in line: + dct['exitCode'] = 0 + dct['exitMessage'] = 'Done successfully.' + datetime_struct = parser.parse(line.split(': Done successfully.')[0]) + dct['endTime'] = datetime_struct.strftime('%Y-%m-%dT%H:%M:%SZ') + dct['state'] = 'SUCCEEDED' + continue + if ': Completed ' in line: + dct['exitMessage'] = 'Done successfully' + datetime_struct = parser.parse(line.split(': Completed ')[0]) + dct['endTime'] = datetime_struct.strftime('%Y-%m-%dT%H:%M:%SZ') + dct['state'] = 'FAILED' + if ': Completed ;' in line: + dct['exitMessage'] = line.split(': Completed ;')[1] + if 'Exited with exit code' in line: + exit_codes = re.findall(r'Exited with exit code (.*?)\.', line) + if exit_codes: + dct['exitCode'] = int(exit_codes[0]) + dct['state'] = 'FAILED' + dct['traceMessages'] = _job_info.strip().strip("\n").split("\n") + # 适配jobId + if '[' in dct['jobId']: + dct['jobId'] = dct['jobId'].replace('[', '.').replace(']', '') + DATA_MAP.get(SUCCESS).append(dct) + return True + + +if __name__ == '__main__': + main() diff --git a/README.md b/README.md index 6232881cb4f6c8fb9a562816dd01606c9cb02d49..9f8e35d44f8b8da0c5fa5cc83d9278e689fa5e9d 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,7 @@ LSF节点信息采集脚本: node,nodeSample LSF作业信息采集脚本: job,jobSample,job_date,jobSample_date LSF作业提交脚本: submit +LSF作业查询脚本: query LSF作业操作脚本: stop, resume,rerun,suspend LSF队列查询脚本: query-active ```` @@ -19,7 +20,7 @@ Python2/Python3 #### 操作教程 -1. 从网址 https://gitee.com/openeuler/portal-mulit-cluster-script 下载压缩包, 解压至{INSTALL_PATH}/huawei/portal/ac/scripts/scheduler/{SCHEDULER_TYPE}/目录下; +1. 从网址 https://gitee.com/openeuler/portal-mulit-cluster-script/tree/HPC_23.0.0_release 下载压缩包, 解压至{INSTALL_PATH}/huawei/portal/ac/scripts/scheduler/{SCHEDULER_TYPE}/目录下; 注:INSTALL_PATH为client安装目录,SCHEDULER_TYPE为调度器类型 @@ -40,6 +41,7 @@ Python2/Python3 │ ├── job │ └── jobSample ├── job + │ ├── query │ ├── rerun │ ├── resume │ ├── stop