From ad1d0b0d36f09f711247fb97275473109cc74501 Mon Sep 17 00:00:00 2001 From: yaokai13 Date: Thu, 10 Jun 2021 10:51:33 +0800 Subject: [PATCH] add some function to check_service --- core/check_meta_service.py | 503 +++++++++++++++++++++++++++++++------ openeuler_obs.py | 3 + 2 files changed, 426 insertions(+), 80 deletions(-) diff --git a/core/check_meta_service.py b/core/check_meta_service.py index 41e0ec6..ea19e53 100644 --- a/core/check_meta_service.py +++ b/core/check_meta_service.py @@ -18,8 +18,13 @@ check the format and URL of the service file """ import os import sys +import requests +Now_path = os.path.join(os.path.split(os.path.realpath(__file__))[0]) +sys.path.append(os.path.join(Now_path, "..")) from common.log_obs import log from xml.etree import ElementTree as ET +from bs4 import BeautifulSoup +import xml.dom.minidom class CheckMetaPull(object): """ @@ -27,11 +32,19 @@ class CheckMetaPull(object): """ def __init__(self, **kwargs): """ - kawrgs:dict,init dict by 'a': 'A' style + kawrgs: dict,init dict by 'a': 'A' style prid: the pullrequest id + branch: which gitee branch you want to ckeck + token: giteee api access_token + current_path: your codes path + check_error: The list which check error """ self.kwargs = kwargs self.prid = self.kwargs['pr_id'] + self.branch = self.kwargs['branch'] + self.token = self.kwargs['access_token'] + self.current_path = os.getcwd() + self.check_error = [] def _clean(self): """ @@ -57,18 +70,20 @@ class CheckMetaPull(object): if "Already" in pull_result[0]: log.info(pull_result) log.info("Success to clone obs_meta") - break + return else: os.popen("if [ -d obs_meta ];then rm -rf obs_meta") + raise SystemExit("*******obs_meta-clone-error:please check your net*******") - def _get_new_pkg(self): + def _get_change_file(self): """ Gets a list of newly added files in obs_meta """ + current_path = os.getcwd() fetch_cmd = "git fetch origin pull/%s/head:thispr" % self.prid branch_cmd = "git branch -a | grep 'thispr'" checkout_cmd = "git checkout thispr" - changed_file_cmd = "git log --name-status -1" + changed_file_cmd = "git diff --name-status master...thispr" for x in range(5): os.chdir("obs_meta") fetch_result = os.popen(fetch_cmd).read() @@ -83,90 +98,418 @@ class CheckMetaPull(object): self._get_latest_obs_meta() show_result = os.popen(checkout_cmd).readlines() log.info(show_result) - changed_file_result = os.popen(changed_file_cmd).readlines() - log.info(changed_file_result) - new_pkg_path = [] - for pkg in changed_file_result: - if "A\t" in pkg: - all_msg = pkg.replace("\n", "") - log.info(all_msg) - pkg_path = all_msg.replace("A\t", "") - new_pkg_path.append(pkg_path) - log.info("All_change_pkg:%s" % new_pkg_path) - if not new_pkg_path: - log.info("There have no Package add") - sys.exit() + changed_file = os.popen(changed_file_cmd).readlines() + log.info(changed_file) + os.chdir("../") + return changed_file + + def _parse_git_log(self, line): + """ + deal diff_patch line mesg + """ + log.info("line:%s" % line) + new_file_path = '' + log_list = list(line.split()) + temp_log_type = log_list[0] + if len(log_list) == 3: + if log_list[1].split('/')[0] != log_list[2].split('/')[0] and \ + log_list[1].split('/')[-2] == log_list[2].split('/')[-2]: + log.error("ERROR_COMMIT: %s" % line) + log.error("FAILED:PR contains the movement of same package \ + between branches") + raise SystemExit("*******PLEASE CHECK YOUR PR*******") + else: + new_file_path = list(line.split())[2] + elif len(log_list) == 2: + if temp_log_type != "D": + new_file_path = list(line.split())[1] + log.info(new_file_path) + return new_file_path + + def _check_file_format(self, file_path): + """ + Function modules: Check the format of the service file + """ + try: + ET.parse(file_path) + log.info("**************FORMAT CORRECT***************") + log.info("The %s has a nice format" % file_path) + log.info("**************FORMAT CORRECT***************") + except Exception as e: + log.error("**************FORMAT ERROR*****************") + log.error("MAY be %s has a bad format" % file_path) + log.error("%s format bad Because:%s" % (file_path, e)) + log.error("**************FORMAT ERROR*****************") + return "yes" + + def _get_url_info(self, pkg_path): + """ + get pkg_name and pkg_branch from service file + """ + url_list = [] + with open(pkg_path, "r") as f: + log.info('\n' + f.read()) + dom = xml.dom.minidom.parse(pkg_path) + root = dom.documentElement + param_lines = root.getElementsByTagName('param') + for url in param_lines: + if url.getAttribute("name") == "url": + url_list.append(url.firstChild.data.strip('/')) + log.info("ALL_URL:%s" % url_list) + return url_list + + def _get_path_info(self, pkg_path): + """ + get pkg_name and pkg_branch from change path + """ + pkg_name = pkg_path.split('/')[-2] + #Support Multi_Version + if "Multi" in pkg_path.split('/')[0] or "multi" in pkg_path.split('/')[0]: + pkg_url = pkg_path.split('/')[1] else: - return new_pkg_path + pkg_url = pkg_path.split('/')[0] + pkg_info = [pkg_name, pkg_url] + return pkg_info - def _check_pkg_services(self, pkg_path_list): + def _pkgname_check(self, pkg_name, service_name_list): + """ + check the pkg_name from change_path and service_name_list """ - Function modules: Check the format of the service file and the correctness of its URL + if pkg_name in service_name_list: + log.info("SUCCESS_CHECK:The %s in _service url is correct" % pkg_name) + return + else: + log.error("**************_Service URL ERROR*****************") + log.error("FAILED:The %s in _service pkgname is not same" % pkg_name) + error_flag = "yes" + return error_flag + + def _check_correspond_from_service(self, **info): + """ + Distingguish check pkg_name between different branch """ error_flag = "" - service_flag = "" - for pkg_path in pkg_path_list: - if "_service" in pkg_path: - service_flag = "exist" - try: - ET.parse(pkg_path) - log.info("**************FORMAT CORRECT***************") - log.info("The %s has a nice _service" % pkg_path) - log.info("**************FORMAT CORRECT***************") - except Exception as e: - log.error("**************FORMAT ERROR*****************") - log.error("MAY be %s has a bad _service format" % pkg_path) - log.error("%s/_service format bad Because:%s" % (pkg_path, e)) - log.error("**************FORMAT ERROR*****************") - error_flag = "yes" - service_path = pkg_path - url_result = "" - with open(service_path, "r") as f: - log.info('\n' + f.read()) - f.seek(0, 0) - for url in f.readlines(): - if "name=\"url\"" in url.replace("\n", ""): - all_url = url.replace("\n", "").split('/') - #log.info(all_url) - spkg_name = all_url[-2].replace("<", "") - spkg_url = all_url[-3] - pkg_name = pkg_path.split('/')[-2] - pkg_url = pkg_path.split('/')[0] - log.info("Service_pkgname:%s" % spkg_name) - log.info("Service_pkgurl:%s" % spkg_url) - log.info("Pkgname:%s" % pkg_name) - log.info("Pkg_url:%s" % pkg_url) - if spkg_url == "openEuler" and pkg_url == "master": - if spkg_name == pkg_name: - log.info("Yes:The %s in _service url is correct" % pkg_name) - else: - log.error("**************_Service URL ERROR*****************") - log.error("FAILED The %s in _service pkgname is not same" % pkg_name) - error_flag = "yes" - elif spkg_url == pkg_url: - if spkg_name == pkg_name: - log.info("Yes:The %s in _service url is correct" % pkg_name) - else: - log.error("**************_Service URL ERROR*****************") - log.error("FAILED The %s in _service pkgname is not same" % pkg_name) - error_flag = "yes" - else: - log.error("**************_Service URL ERROR*****************") - log.error("FAILED You need to check your %s _service again" % pkg_name) - error_flag = "yes" - break - os.chdir("../") - self._clean() + if list(set(info['ser_branch']))[0] == "openEuler" \ + and info['pkg_url'] == "master" and len(list(set(info['ser_branch']))) == 1\ + and list(set(info['ser_next']))[0] == "next" \ + and len(list(set(info['ser_next']))) == 1: + error_flag = self._pkgname_check(info['pkg_name'], info['ser_name']) + elif list(set(info['ser_branch']))[0] == info['pkg_url'] \ + and len(list(set(info['ser_branch']))) == 1 \ + and list(set(info['ser_next']))[0] == "next" \ + and len(list(set(info['ser_next']))) == 1: + error_flag = self._pkgname_check(info['pkg_name'], info['ser_name']) + else: + log.error("**************_Service URL ERROR*****************") + log.error("FAILED:Please check the branch in your %s _service again" + % info['pkg_name']) + error_flag = "yes" + return error_flag + + def _check_pkg_services(self, pkg_path): + """ + Check the format of the service file and the correctness of its URL + """ + service_name = [] + service_branch = [] + service_next = [] + error_flag2 = "" + error_flag3 = "" + os.chdir("%s/obs_meta" % self.current_path) + error_flag1 = self._check_file_format(pkg_path) + if error_flag1 != "yes": + url_list = self._get_url_info(pkg_path) + for all_url in url_list: + service_next.append(all_url.split('/')[0]) + service_name.append(all_url.split('/')[-1]) + service_branch.append(all_url.split('/')[-2]) + path_info = self._get_path_info(pkg_path) + log.info("Url_Next:%s" % service_next) + log.info("Pkgname_in_Service:%s" % service_name) + log.info("pkgbranch_in_Service:%s" % service_branch) + log.info("Pkgname_in_Obe_meta_path:%s" % path_info[0]) + log.info("Pkgbranch_in_Obs_meta_path:%s" % path_info[1]) + all_info = {'ser_branch':service_branch, 'pkg_url':path_info[1], + 'ser_next':service_next, 'pkg_name':path_info[0], + 'ser_name':service_name} + error_flag2 = self._check_correspond_from_service(**all_info) + if "-b" not in sys.argv: + error_flag3 = self._detect_protect_branch(pkg_path) + error_flag = error_flag1 or error_flag2 or error_flag3 + if error_flag: + self.check_error.append(pkg_path) + return error_flag + + def _check_meta_pro_and_repo(self, pro_repo): + """ + get the project and repository from meta and distinguish + """ + all_path_flag = [] + for msg in pro_repo: + log.info("msg:%s" % msg) + msglist = str(msg).split() + for key in msglist: + if "project=" in key: + project = key.split("\"")[1] + if "repository=" in key: + repository = key.split("\"")[1] + log.info(project) + log.info(repository) + error_flag = self._exist_for_pro_and_repo(project, repository) + all_path_flag.append(error_flag) + if "yes" in all_path_flag: + return "yes" + + def _exist_for_pro_and_repo(self, project, repository): + """ + check the project and repository is or not exist in obs + """ + error_flag = None + exit_cmd = "osc r %s --xml 2>/dev/null | grep %s" % (project, repository) + exit_result = os.popen(exit_cmd).read() + log.info(exit_result) + if not exit_result: + error_flag = "yes" + log.error("Failed:The %s Not exit in %s" % (repository, project)) + else: + log.info("Success:The %s exit in %s" % (repository, project)) + return error_flag + + def _check_pro_meta(self, pkg_path): + """ + check the _meta have the maintainer or path is or not + """ + os.chdir("%s/obs_meta" % self.current_path) + error_flag1 = self._check_file_format(pkg_path) + error_flag2 = "" + error_flag3 = "" + with open(pkg_path, "r") as f: + parse = f.read() + log.info('\n' + parse) + soup = BeautifulSoup(parse, "html.parser") + maintainers = soup.find_all('person') + pro_repo = soup.find_all('path') + log.info("maintainers:%s" % maintainers) + log.info("pro_repo:%s" % pro_repo) + if not maintainers: + error_flag2 = "yes" + log.error("The _meta file does not have the maintainers section!!!!!!") + error_flag3 = self._check_meta_pro_and_repo(pro_repo) + error_flag = error_flag1 or error_flag2 or error_flag3 if error_flag == "yes": - raise SystemExit("*******PLEASE CHECK AGAIN*******") - if service_flag == "": - log.info("There has no pkg add to project") + self.check_error.append(pkg_path) + return error_flag - def do_all(self): + def _multi_name_check(self, change_file): """ - Assemble all inspection processes and provide external interfaces + check the multi_version format + """ + path_list = change_file.split('/') + path_list_len = len(path_list) + log.info("multi_name_check:%s" % path_list) + error_flag2 = "" + error_flag1 = "" + if path_list_len >= 3: + multi_branch = path_list[2] + multi_branch_list = multi_branch.split('_') + multi_branch_head = multi_branch_list[0] + multi_branch_mid = multi_branch_list[1] + multi_branch_tail = multi_branch_list[2] + branch_list = os.listdir(os.path.join(self.current_path, "obs_meta")) + if path_list_len == 4: + multi_project = path_list[3] + multi_pro_head = multi_project.split(':Multi-Version:')[0] + multi_pro_tail = multi_project.split(':Multi-Version:')[1] + if os.path.exists(os.path.join(self.current_path, "obs_meta", + multi_branch_tail)): + project_list = os.listdir(os.path.join(self.current_path, "obs_meta", + multi_branch_tail)) + else: + project_list = None + log.error("The %s is not exists!,please check your %s!!!" + % (multi_branch_tail, multi_branch)) + error_flag1 = "yes" + if branch_list: + log.info("multi_branch_head:%s" % multi_branch_head) + log.info("multi_branch_tail:%s" % multi_branch_tail) + log.info("branch_list:%s" % branch_list) + if multi_branch_head == "Multi-Version" and multi_branch_tail in branch_list: + log.info("Success to check branch:%s" % multi_branch) + else: + log.error("Failed!!! Make sure your Mukti-branch:%s" % multi_branch) + error_flag2 = "yes" + if branch_list and project_list: + log.info("multi_pro_head:%s" % multi_pro_head) + log.info("multi_pro_tail:%s" % multi_pro_tail.replace(':', '-')) + log.info("multi_branch_mid:%s" % multi_branch_mid) + log.info("project_list:%s" % project_list) + if multi_pro_tail.replace(':', '-') == multi_branch_mid and \ + error_flag2 != "yes" and multi_pro_head in project_list: + log.info("Success to check project:%s/%s" % (multi_branch, multi_project)) + else: + log.error("Failed!!! Make sure your Multi_branch and project:%s/%s" \ + % (multi_branch, multi_project)) + error_flag2 = "yes" + error_flag = error_flag1 or error_flag2 + return error_flag + + def _detect_protect_branch(self, change_path): + """ + check the pkg whether the branch is protected + """ + path_list = change_path.split('/') + if "multi" in path_list[0] or "Multi" in path_list[0]: + branch = path_list[1] + else: + branch = path_list[0] + pkg = path_list[-2] + api_url = "https://gitee.com/api/v5/repos/src-openeuler/%s" % pkg + \ + "/branches/%s?access_token=%s" % (branch, self.token) + attempts = 0 + success = False + while attempts < 5 and not success: + try: + response = requests.request(method='get', url=api_url) + protect_value = response.json()['protected'] + log.info("%s protected:%s" % (pkg, protect_value)) + success = True + if not protect_value: + log.error("ERROR:%s in %s is not protected branch" % (pkg, branch)) + return "yes" + else: + return + except Exception as e: + attempts += 1 + if attempts == 5: + break + log.error("***************Get Request From Gitee ERROR***************") + log.error("%s in %s can not get branch information from gitee" % (pkg, branch)) + log.error("***************Get Request From Gitee ERROR***************") + return "yes" + + def _get_all_change_file(self): + """ + get all change file_path """ self._clean() self._get_latest_obs_meta() - changelist = self._get_new_pkg() - self._check_pkg_services(changelist) + changefile = self._get_change_file() + changelist = [] + for msg in changefile: + parse_git = self._parse_git_log(msg) + if parse_git: + changelist.append(parse_git) + if changelist: + return changelist + else: + log.info("Finish!!There are no file need to check") + sys.exit() + + def _get_multi_branch_and_project(self, multi_change): + """ + get all the multi_version branch and one of the corresponding projects + """ + pro_dir = "multi_version/%s" % multi_change.split('/')[1] + branchs = os.listdir(os.path.join(self.current_path, \ + "obs_meta", "OBS_PRJ_meta/multi_version")) + projects = os.listdir(os.path.join(self.current_path, \ + "obs_meta", "OBS_PRJ_meta", pro_dir)) + log.info("multi_branch_list:%s" % branchs) + log.info("multi_project_list:%s" % projects) + multi_info = {'branchs':branchs, 'projects':projects} + return multi_info + + def _distinguish_check_service(self, change): + """ + Distingguish the handing of Multi branch from that of Service + """ + if "multi" in change.split('/')[0] or "Multi" in change.split('/')[0] and \ + len(change.split('/')) == 5: + multi_info = self._get_multi_branch_and_project(change) + if change.split('/')[1] in multi_info['branchs'] and \ + change.split('/')[2] in multi_info['projects']: + error_flag = self._check_pkg_services(change) + else: + log.error("Please check your commit dir %s!!!" % change) + error_flag = "yes" + elif len(change.split('/')) == 4: + error_flag = self._check_pkg_services(change) + else: + log.error("Please check your commit dir %s!!!" % change) + error_flag = "yes" + return error_flag + + def _check_service_meta(self, change_list): + """ + check the _service or _meta file in commit + """ + flag_list = [] + for change in change_list: + if "_service" in change: + error_flag = self._distinguish_check_service(change) + flag_list.append(error_flag) + elif "OBS_PRJ_meta" in change and "prjconf" not in change: + error_flag1 = None + if "multi" in change or "Multi" in change: + error_flag1 = self._multi_name_check(change) + error_flag2 = self._check_pro_meta(change) + error_flag = error_flag1 or error_flag2 + flag_list.append(error_flag) + elif "OBS_PRJ_meta" in change and "prjconf" in change: + continue + else: + log.info("There are no _service or _meta need to check") + os.chdir(self.current_path) + self._clean() + if "yes" in flag_list: + print("error_check_list:") + for error_service in self.check_error: + print(error_service) + raise SystemExit("*******PLEASE CHECK YOUR PR*******") + + def _check_branch_service(self, branch): + """ + check the file for the corresponding branch + """ + if "multi" in branch or "Multi" in branch: + branchlist = os.listdir(os.path.join(self.current_path, "obs_meta", + "OBS_PRJ_meta", "multi_version")) + else: + branchlist = os.listdir(os.path.join(self.current_path, "obs_meta", + "OBS_PRJ_meta")) + meta_path = os.path.join(self.current_path, "obs_meta") + os.chdir(meta_path) + if branch in branchlist: + if "multi" in branch or "Multi" in branch: + service = os.popen("find multi_version/%s | grep _service" % branch).read() + else: + service = os.popen("find %s | grep _service" % branch).read() + elif branch == "all": + service = os.popen("find | grep _service").read() + service_list = list(service.split('\n')) + ignore_branch = "openEuler-EPOL-LTS" + service_path_list = [x.lstrip("./") for x in service_list if x and ignore_branch not in x] + log.info(service_path_list) + self._check_service_meta(service_path_list) + + def do_all(self): + """ + make the get obs_meta change fuction and check fuction doing + """ + if self.prid and self.token: + change_result = self._get_all_change_file() + check_result = self._check_service_meta(change_result) + elif not self.prid and self.branch: + self._clean() + self._get_latest_obs_meta() + self._check_branch_service(self.branch) + else: + log.error("ERROR_INPUT:PLEASE CHECK YOU INPUT") + +if __name__ == "__main__": + kw = {} + kw['pr_id'] = "742" + kw['branch'] = "" + kw['token'] = "" + check = CheckMetaPull(**kw) + check.do_all() diff --git a/openeuler_obs.py b/openeuler_obs.py index 570647f..2071677 100644 --- a/openeuler_obs.py +++ b/openeuler_obs.py @@ -83,6 +83,8 @@ par.add_argument("-cc", "--check_codes", default=False, help="check codes same or not between gitee and obs,should be with -b and -p", required=False) par.add_argument("-cps", "--check_pkg_service", default=False, help="check if there are any problems with the content of the _service file in the rpm package", required=False) +par.add_argument("-at", "--access_token", default=False, + help="gitee access_token number used for api get", required=False) par.add_argument("-prid", "--pr_id", default=False, help="use the pr_id to get this pullrequest", required=False) par.add_argument("-sc", "--sync_code", default=True, @@ -133,6 +135,7 @@ kw = { "latest_info": args.latest_info, "check_codes": args.check_codes, "check_pkg_service": args.check_pkg_service, + "access_token": args.access_token, "pr_id": args.pr_id, "sync_code": args.sync_code, "all": args.ALL_, -- Gitee