41 Star 26 Fork 101

openEuler / openeuler-jenkins

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
check_repo.py 16.47 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
# -*- encoding=utf-8 -*-
"""
# ***********************************************************************************
# Copyright (c) Huawei Technologies Co., Ltd. 2020-2020. All rights reserved.
# [openeuler-jenkins] is licensed under the Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
# Author: DisNight
# Create: 2020-09-22
# Description: check yaml file in software package
# ***********************************************************************************/
"""
# \ / /| /| /| /
# \ / /_| / | / | /
# / / | / | / | /
# / / | / |/ | /_____
import abc
import json
import logging
import os
import re
import subprocess
import urllib.parse as urlparse
import requests
import tldextract
logging.getLogger("ac")
class AbsReleaseTags(object):
"""
获取release tags的抽象类
"""
__metaclass__ = abc.ABCMeta
def __init__(self, version_control):
self.version_control = version_control
@abc.abstractmethod
def url(self, repo):
"""
抽象方法
"""
pass
@abc.abstractmethod
def get_tags(self, repo):
"""
抽象方法
"""
pass
class DefaultReleaseTags(AbsReleaseTags):
"""
获取release tags的基类
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return ""
def get_tags(self, repo):
"""
通过url获取上游社区的release tags
return: list
"""
logging.info("unsupported version control: %s", self.version_control)
return []
class HttpReleaseTagsMixin(object):
"""
通过web请求形式获取release tags
"""
DEFAULT_REQ_HEADER = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64)'
}
def get_redirect_resp(self, url, response):
"""
获取重定向的url和cookie
return: bool, str, list
"""
cookie = set()
href = ""
need_redirect = False
for line in response.text.splitlines():
line = line.strip()
if line.startswith("Redirecting"):
logging.debug("Redirecting with document.cookie")
need_redirect = True
search_result = re.search(r"document\.cookie=\"(.*)\";", line)
if search_result:
cookie = cookie | set(search_result.group(1).split(';'))
search_result = re.search(r"document\.location\.href=\"(.*)\";", line)
if search_result:
href = search_result.group(1)
new_url = urlparse.urljoin(url, href)
if "" in cookie:
cookie.remove("")
return need_redirect, new_url, list(cookie)
def get_request_response(self, url, timeout=30, headers=None):
"""
获取url请求获取response
return: reponse
"""
headers = self.DEFAULT_REQ_HEADER if headers is None else headers
try:
response = requests.get(url, headers=headers, timeout=timeout)
need_redirect, new_url, cookies = self.get_redirect_resp(url, response)
if tldextract.extract(url).domain != tldextract.extract(new_url).domain: # 判断域名是否一致 预防csrf攻击
logging.warning("domain of redirection link is different: %s", new_url)
return ""
if need_redirect:
cookie_dict = {}
for cookie in cookies:
key, val = cookie.split("=")
cookie_dict[key] = val
url = new_url
response = requests.get(url, headers=headers, cookies=cookie_dict, timeout=timeout)
except requests.exceptions.SSLError as e:
logging.warning("requests %s ssl exception, %s", url, e)
return ""
except requests.exceptions.Timeout as e:
logging.warning("requests timeout")
return ""
except requests.exceptions.RequestException as e:
logging.warning("requests exception, %s", e)
return ""
return response
class HgReleaseTags(AbsReleaseTags, HttpReleaseTagsMixin):
"""
获取hg上游社区release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return urlparse.urljoin(repo + "/", "json-tags") if repo else ""
def get_tags(self, repo):
"""
通过url获取上游社区的release tags
return: list
"""
url = self.url(repo)
logging.debug("%s : get %s tags", url, self.version_control)
if not url:
logging.warning("illegal url: \"\"")
return []
response = self.get_request_response(url)
if not response:
logging.warning("unable to get response:")
return []
try:
tags_json = json.loads(response.text)
if not all(tags_json, isinstance(tags_json, dict)):
return []
temp_tags = tags_json["tags"]
temp_tags.sort(reverse=True, key=lambda x: x["date"][0])
release_tags = [tag["tag"] for tag in temp_tags]
except (json.JSONDecodeError, IOError, KeyError, IndexError) as e:
logging.error("exception, %s", e)
return []
return release_tags
class HgRawReleaseTags(AbsReleaseTags, HttpReleaseTagsMixin):
"""
获取hg raw上游社区release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return urlparse.urljoin(repo + "/", "raw-tags") if repo else ""
def get_tags(self, repo):
"""
通过url获取上游社区的release tags
return: list
"""
url = self.url(repo)
logging.debug("%s : get %s tags", url, self.version_control)
if not url:
logging.warning("illegal url: \"\"")
return []
response = self.get_request_response(url)
release_tags = []
for line in response.text.splitlines():
release_tags.append(line.split()[0])
return release_tags
class MetacpanReleaseTags(AbsReleaseTags, HttpReleaseTagsMixin):
"""
获取metacpan上游社区release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return urlparse.urljoin("https://metacpan.org/release/", repo) if repo else ""
def get_tags(self, repo):
"""
通过url获取上游社区的release tags
return: list
"""
url = self.url(repo)
logging.debug("%s : get %s tags", url, self.version_control)
if not url:
logging.warning("illegal url: \"\"")
return []
response = self.get_request_response(url)
if not response:
return []
resp_lines = response.text.splitlines()
release_tags = []
tag_condition = "value=\"/release"
for index in range(len(resp_lines) - 1):
if tag_condition in resp_lines[index]:
tag = resp_lines[index + 1]
index += 1
if "DEV" in tag:
continue
tag = tag.strip()
release_tags.append(tag)
return release_tags
class PypiReleaseTags(AbsReleaseTags, HttpReleaseTagsMixin):
"""
获取pypi上游社区release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return urlparse.urljoin(os.path.join("https://pypi.org/pypi/", repo, "json")) if repo else ""
def get_tags(self, repo):
"""
通过url获取上游社区的release tags
return: list
"""
url = self.url(repo)
logging.debug("%s : get %s tags", url, self.version_control)
if not url:
logging.warning("illegal url: \"\"")
return []
response = self.get_request_response(url)
try:
tags_json = response.json()
release_tags = [tag for tag in tags_json.get("releases", [])]
except (UnicodeDecodeError, json.JSONDecodeError) as e:
logging.error("exception, %s", e)
return []
return release_tags
class RubygemReleaseTags(AbsReleaseTags, HttpReleaseTagsMixin):
"""
获取rubygem上游社区release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return urlparse.urljoin("https://rubygems.org/api/v1/versions/", repo + ".json") if repo else ""
def get_tags(self, repo):
"""
通过url获取上游社区的release tags
return: list
"""
url = self.url(repo)
logging.debug("%s : get %s tags", url, self.version_control)
if not url:
logging.warning("illegal url: \"\"")
return []
response = self.get_request_response(url)
try:
tags_json = response.json()
release_tags = []
for element in tags_json:
if element.get("number"):
release_tags.append(element.get("number"))
except (UnicodeDecodeError, json.JSONDecodeError) as e:
logging.error("exception, %s", e)
return []
return release_tags
class GnuftpReleaseTags(AbsReleaseTags, HttpReleaseTagsMixin):
"""
获取gnu-ftp上游社区release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return urlparse.urljoin("https://ftp.gnu.org/gnu/", repo) if repo else ""
def get_tags(self, repo):
"""
通过url获取上游社区的release tags
return: list
"""
url = self.url(repo)
logging.debug("%s : get %s tags", url, self.version_control)
if not url:
logging.warning("illegal url: \"\"")
return []
response = self.get_request_response(url)
pattern = re.compile("href=\"(.*)\">(.*)</a>")
release_tags = []
if not response:
return []
for line in response.text.splitlines():
search_result = pattern.search(line)
if search_result:
release_tags.append(search_result.group(1)) # python2用法 python3不同
return release_tags
class FtpReleaseTags(AbsReleaseTags, HttpReleaseTagsMixin):
"""
获取ftp上游社区release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return urlparse.urljoin('ftp', repo + "/") if repo else ""
def get_tags(self, repo):
"""
通过url获取上游社区的release tags
return: list
"""
url = self.url(repo)
logging.debug("%s : get %s tags", url, self.version_control)
if not url:
logging.warning("illegal url: \"\"")
return []
response = self.get_request_response(url)
pattern = re.compile("href=\"(.*)\">(.*)</a>")
release_tags = []
for line in response.text.splitlines():
search_result = pattern.search(line)
if search_result:
release_tags.append(search_result.group(1)) # python2用法 python3不同
return release_tags
class CmdReleaseTagsMixin(object):
"""
通过shell命令获取上游社区的release tags
"""
def get_cmd_response(self, cmd_list):
"""
获取shell命令的reponse
return: reponse
"""
sub_proc = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
response = sub_proc.stdout.read().decode("utf-8")
if sub_proc.wait():
logging.warning("%s > encount errors", " ".join(cmd_list))
return response
class SvnReleaseTags(AbsReleaseTags, CmdReleaseTagsMixin):
"""
通过shell svn命令获取上游社区的release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return urlparse.urljoin(repo + "/", "tags") if repo else ""
def get_response(self, url):
"""
生成svn命令并获取reponse
return: response
"""
cmd_list = ["/usr/bin/svn", "ls", "-v", url]
return self.get_cmd_response(cmd_list)
def get_tags(self, repo):
"""
通过shell cmd访问远端获取上游社区的release tags
return: list
"""
url = self.url(repo)
logging.debug("%s : get svn tags", url)
if not url:
logging.warning("illegal url: \"\"")
return []
response = self.get_response(url)
release_tags = []
for line in response.splitlines():
for item in line.split():
if item and item[-1] == "/":
release_tags.append(item[:-1])
break
return release_tags
class GitReleaseTags(AbsReleaseTags, CmdReleaseTagsMixin):
"""
通过shell git命令获取上游社区的release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return repo
def get_response(self, url):
"""
生成git命令并获取reponse
return: response
"""
cmd_list = ["git", "ls-remote", "--tags", url]
return self.get_cmd_response(cmd_list)
def trans_reponse_tags(self, reponse):
"""
解析git命令返回值为纯数字形式的tag
return: list
"""
release_tags = []
pattern = re.compile(r"^([^ \t]*)[ \t]*refs\/tags\/([^ \t]*)")
for line in reponse.splitlines():
match_result = pattern.match(line)
if match_result:
tag = match_result.group(2)
if not tag.endswith("^{}"):
release_tags.append(tag)
return release_tags
def get_tags(self, repo):
"""
通过shell cmd访问远端获取上游社区的release tags
return: list
"""
url = self.url(repo)
logging.debug("%s : get %s tags", url, self.version_control)
if not url:
logging.warning("illegal url: \"\"")
return []
response = self.get_response(url)
return self.trans_reponse_tags(response)
class GithubReleaseTags(GitReleaseTags):
"""
获取github上游社区release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return urlparse.urljoin("https://github.com/", repo + ".git") if repo else ""
class GiteeReleaseTags(GitReleaseTags):
"""
获取gitee上游社区release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
return urlparse.urljoin("https://gitee.com/", repo) if repo else ""
class GitlabReleaseTags(GitReleaseTags):
"""
获取gitlab.gnome上游社区release tags
"""
def url(self, repo):
"""
通过src_repo生成url
return: str
"""
if not repo:
return ""
src_repos = repo.split("/")
if len(src_repos) == 1:
return urlparse.urljoin("https://gitlab.gnome.org/GNOME/", repo + ".git")
else:
return urlparse.urljoin("https://gitlab.gnome.org/", repo + ".git")
class ReleaseTagsFactory(object):
"""
ReleaseTags及其子类的工厂类
"""
VERSION_CTRL_GETTER_MAPPING = {
"hg": HgReleaseTags,
"hg-raw": HgRawReleaseTags,
"github": GithubReleaseTags,
"git": GitReleaseTags,
"gitlab.gnome": GitlabReleaseTags,
"svn": SvnReleaseTags,
"metacpan": MetacpanReleaseTags,
"pypi": PypiReleaseTags,
"rubygem": RubygemReleaseTags,
"gitee": GiteeReleaseTags,
"gnu-ftp": GnuftpReleaseTags,
"ftp": FtpReleaseTags
}
@staticmethod
def get_release_tags(version_control):
"""
通过version control返回对应的ReleaseTags的子类
return: class
"""
release_tags = ReleaseTagsFactory.VERSION_CTRL_GETTER_MAPPING.get(version_control, DefaultReleaseTags)
return release_tags(version_control)
1
https://gitee.com/openeuler/openeuler-jenkins.git
git@gitee.com:openeuler/openeuler-jenkins.git
openeuler
openeuler-jenkins
openeuler-jenkins
master

搜索帮助