5 Star 15 Fork 15

OceanBase / obdeploy

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
_repository.py 24.58 KB
一键复制 编辑 原始数据 按行查看 历史
Rongfeng Fu 提交于 2023-08-02 18:26 . V2.2.0 (#176)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
# coding: utf-8
# OceanBase Deploy.
# Copyright (C) 2021 OceanBase
#
# This file is part of OceanBase Deploy.
#
# OceanBase Deploy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# OceanBase Deploy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with OceanBase Deploy. If not, see <https://www.gnu.org/licenses/>.
from __future__ import absolute_import, division, print_function
import os
import sys
import time
import hashlib
from glob import glob
from multiprocessing import cpu_count
from multiprocessing.pool import Pool
from _rpm import Package, PackageInfo, Version
from _arch import getBaseArch
from _environ import ENV_DISABLE_PARALLER_EXTRACT
from tool import DirectoryUtil, FileUtil, YamlLoader, COMMAND_ENV
from _manager import Manager
from _plugin import InstallPlugin
class LocalPackage(Package):
class RpmObject(object):
def __init__(self, headers, files):
self.files = files
self.opens = {}
self.headers = headers
def __exit__(self, *arg, **kwargs):
for path in self.opens:
self.opens[path].close()
def __enter__(self):
self.__exit__()
self.opens = {}
return self
def extractfile(self, name):
if name not in self.files:
raise KeyError("member %s could not be found" % name)
path = self.files[name]
if path not in self.opens:
self.opens[path] = open(path, 'rb')
return self.opens[path]
def __init__(self, path, name, version, files, release=None, arch=None):
self.name = name
self.set_version(version)
self.set_release(release if release else time.strftime("%Y%m%d%H%M%S", time.localtime(time.time())))
self.md5 = None
self.arch = arch if arch else getBaseArch()
self.headers = {}
self.files = self.get_all_files(files)
self.path = path
self.package()
def __hash__(self):
return hash(self.path)
@staticmethod
def get_all_files(source_files):
files = {}
for src_path, target_path in source_files.items():
if not os.path.isdir(target_path) or os.path.islink(target_path):
files[src_path] = target_path
else:
files[src_path+'/'] = target_path
for fp in LocalPackage.list_dir(target_path):
files[os.path.join(src_path, os.path.relpath(fp, target_path))] = fp
return files
@staticmethod
def list_dir(path):
files = []
for fn in os.listdir(path):
fp = os.path.join(path, fn)
if not os.path.isdir(fp) or os.path.islink(fp):
files.append(fp)
else:
files += LocalPackage.list_dir(fp)
return files
def package(self):
count = 0
dirnames = []
filemd5s = []
filemodes = []
basenames = []
dirindexes = []
filelinktos = []
dirnames_map = {}
m_sum = hashlib.md5()
for src_path in self.files:
target_path = self.files[src_path]
dirname, basename = os.path.split(src_path)
if dirname not in dirnames_map:
dirnames.append(dirname)
dirnames_map[dirname] = count
count += 1
if basename:
basenames.append(basename)
dirindexes.append(dirnames_map[dirname])
if os.path.islink(target_path):
filemd5s.append('')
filelinktos.append(os.readlink(target_path))
filemodes.append(-24065)
else:
m_value = FileUtil.checksum(target_path)
m_sum.update(m_value)
filemd5s.append(m_value)
filelinktos.append('')
filemodes.append(os.stat(target_path).st_mode)
self.headers = {
'dirnames': dirnames,
'filemd5s': filemd5s,
'filemodes': filemodes,
'basenames': basenames,
'dirindexes': dirindexes,
'filelinktos': filelinktos,
}
self.md5 = m_sum.hexdigest()
def open(self):
return self.RpmObject(self.headers, self.files)
class ExtractFileInfo(object):
def __init__(self, src_path, target_path, mode):
self.src_path = src_path
self.target_path = target_path
self.mode = mode
class Extractor(object):
def __init__(self, pkg, files, stdio=None):
self.pkg = pkg
self.files = files
self.stdio = stdio
def extract(self):
with self.pkg.open() as rpm:
for info in self.files:
if os.path.exists(info.target_path):
continue
fd = rpm.extractfile(info.src_path)
with FileUtil.open(info.target_path, 'wb', stdio=self.stdio) as f:
FileUtil.copy_fileobj(fd, f)
if info.mode != 0o744:
os.chmod(info.target_path, info.mode)
return True
class ParallerExtractor(object):
MAX_PARALLER = cpu_count() * 2 if cpu_count() else 8
MAX_SIZE = 100
MIN_SIZE = 20
def __init__(self, pkg, files, stdio=None):
self.pkg = pkg
self.files = files
self.stdio = stdio
@staticmethod
def _extract(worker):
return worker.extract()
def extract(self):
if not self.files:
return
if sys.version_info.major == 2 or COMMAND_ENV.get(ENV_DISABLE_PARALLER_EXTRACT, False):
return self._single()
else:
return self._paraller()
def _single(self):
self.stdio and getattr(self.stdio, 'verbose', print)('extract mode: single')
return Extractor(
self.pkg,
self.files,
stdio=self.stdio
).extract()
def _paraller(self):
self.stdio and getattr(self.stdio, 'verbose', print)('extract mode: paraller')
workers = []
file_num = len(self.files)
paraller = int(min(self.MAX_PARALLER, file_num))
size = min(self.MAX_SIZE, int(file_num / paraller)) #
size = int(max(self.MIN_SIZE, size))
index = 0
while index < file_num:
p_index = index + size
workers.append(Extractor(
self.pkg,
self.files[index:p_index],
stdio=self.stdio
))
index = p_index
pool = Pool(processes=paraller)
try:
results = pool.map(ParallerExtractor._extract, workers)
for r in results:
if not r:
return False
return True
except KeyboardInterrupt:
if pool:
pool.close()
pool = None
except:
self.stdio and getattr(self.stdio, 'exception', print)('')
finally:
pool and pool.close()
return False
class Repository(PackageInfo):
_DATA_FILE = '.data'
def __init__(self, name, repository_dir, stdio=None):
self.repository_dir = repository_dir
super(Repository, self).__init__(name, None, None, None, None)
self.stdio = stdio
self._load()
@property
def hash(self):
return self.md5
def __str__(self):
return '%s-%s-%s-%s' % (self.name, self.version, self.release, self.hash)
def __hash__(self):
return hash(self.repository_dir)
def is_shadow_repository(self):
if os.path.exists(self.repository_dir):
return os.path.islink(self.repository_dir)
return False
@property
def data_file_path(self):
path = os.readlink(self.repository_dir) if os.path.islink(self.repository_dir) else self.repository_dir
return os.path.join(path, Repository._DATA_FILE)
def bin_list(self, plugin):
files = []
if self.version and self.hash:
for file_item in plugin.file_list(self):
if file_item.type == InstallPlugin.FileItemType.BIN:
files.append(os.path.join(self.repository_dir, file_item.target_path))
return files
def file_list(self, plugin):
files = []
if self.version and self.hash:
for file_item in plugin.file_list(self):
path = os.path.join(self.repository_dir, file_item.target_path)
if file_item.type == InstallPlugin.FileItemType.DIR:
files += DirectoryUtil.list_dir(path)
else:
files.append(path)
return files
def file_check(self, plugin):
for file_path in self.file_list(plugin):
if not os.path.exists(file_path):
return False
return True
def __eq__(self, other):
if isinstance(other, self.__class__):
return self.version == other.version and self.hash == other.hash
if isinstance(other, dict):
return self.version == other['version'] and self.hash == other['hash']
return super(Repository, self).__eq__(other)
def _load(self):
try:
with open(self.data_file_path, 'r') as f:
data = YamlLoader().load(f)
self.set_version(data.get('version'))
self.set_release(data.get('release'))
self.md5 = data.get('hash')
self.arch = data.get('arch')
self.install_time = data.get('install_time', 0)
except:
pass
def _parse_path(self):
if self.is_shadow_repository():
path = os.readlink(self.repository_dir)
else:
path = self.repository_dir
path = path.strip('/')
path, _hash = os.path.split(path)
path, version = os.path.split(path)
if not self.version:
self.set_version(version)
def _dump(self):
data = {'version': self.version, 'hash': self.hash, 'release': self.release, 'arch': self.arch}
if self.install_time:
data['install_time'] = self.install_time
try:
with open(self.data_file_path, 'w') as f:
YamlLoader().dump(data, f)
return True
except:
self.stdio and getattr(self.stdio, 'exception', print)('dump %s to %s failed' % (data, self.data_file_path))
return False
def need_load(self, pkg, plugin):
return self.hash != pkg.md5 or not self.install_time > plugin.check_value or not self.file_check(plugin)
def load_pkg(self, pkg, plugin):
if self.is_shadow_repository():
self.stdio and getattr(self.stdio, 'print', '%s is a shadow repository' % self)
return False
self.clear()
try:
with pkg.open() as rpm:
file_map = plugin.file_map(pkg)
need_dirs = {}
need_files = {}
for src_path in file_map:
file_item = file_map[src_path]
if file_item.type == InstallPlugin.FileItemType.DIR:
if not src_path.endswith('/'):
src_path += '/'
need_dirs[src_path] = file_item.target_path
else:
need_files[src_path] = file_item.target_path
files = {}
links = {}
dirnames = rpm.headers.get("dirnames")
basenames = rpm.headers.get("basenames")
dirindexes = rpm.headers.get("dirindexes")
filelinktos = rpm.headers.get("filelinktos")
filemd5s = rpm.headers.get("filemd5s")
filemodes = rpm.headers.get("filemodes")
dirs = sorted(need_dirs.keys(), reverse=True)
format_str = lambda s: s.decode(errors='replace') if isinstance(s, bytes) else s
for i in range(len(basenames)):
if not filemd5s[i] and not filelinktos[i]:
continue
dir_path = format_str(dirnames[dirindexes[i]])
if not dir_path.startswith('./'):
dir_path = '.%s' % dir_path
file_name = format_str(basenames[i])
path = os.path.join(dir_path, file_name)
files[path] = i
if path not in need_files:
for n_dir in need_dirs:
if path.startswith(n_dir):
need_files[path] = os.path.join(need_dirs[n_dir], path[len(n_dir):])
break
need_extract_files = []
for src_path in need_files:
if src_path not in files:
raise Exception('%s not found in packge' % src_path)
target_path = os.path.join(self.repository_dir, need_files[src_path])
if os.path.exists(target_path):
return
idx = files[src_path]
if filemd5s[idx]:
need_extract_files.append(ExtractFileInfo(
src_path,
target_path,
filemodes[idx] & 0x1ff
))
elif filelinktos[idx]:
links[target_path] = filelinktos[idx]
else:
raise Exception('%s is directory' % src_path)
ParallerExtractor(pkg, need_extract_files, stdio=self.stdio).extract()
for link in links:
self.stdio and getattr(self.stdio, 'verbose', print)('link %s to %s' % (links[link], link))
os.symlink(links[link], link)
for n_dir in need_dirs:
path = os.path.join(self.repository_dir, need_dirs[n_dir])
if not os.path.exists(path) and n_dir[:-1] in dirnames:
DirectoryUtil.mkdir(path)
if not os.path.isdir(path):
raise Exception('%s in %s is not dir.' % (n_dir, pkg.path))
self.set_version(pkg.version)
self.set_release(pkg.release)
self.md5 = pkg.md5
self.arch = pkg.arch
self.install_time = time.time()
if self._dump():
return True
else:
self.clear()
except:
self.stdio and getattr(self.stdio, 'exception', print)('failed to extract file from %s' % pkg.path)
self.clear()
return False
def clear(self):
if os.path.exists(self.repository_dir):
return DirectoryUtil.rm(self.repository_dir, self.stdio) and DirectoryUtil.mkdir(self.repository_dir, stdio=self.stdio)
return True
class RepositoryVO(object):
def __init__(self, name, version, release, arch, md5, path, tags=[]):
self.name = name
self.version = version
self.release = release
self.arch = arch
self.md5 = md5
self.path = path
self.tags = tags
class ComponentRepository(object):
def __init__(self, name, repository_dir, stdio=None):
self.repository_dir = repository_dir
self.stdio = stdio
self.name = name
DirectoryUtil.mkdir(self.repository_dir, stdio=stdio)
def get_instance_repositories(self, version):
repositories = {}
for tag in os.listdir(self.repository_dir):
path = os.path.join(self.repository_dir, tag)
if os.path.islink(path):
continue
repository = Repository(self.name, path, self.stdio)
if repository.hash:
repositories[repository.hash] = repository
return repositories
def get_shadow_repositories(self, version, instance_repositories={}):
repositories = {}
for tag in os.listdir(self.repository_dir):
path = os.path.join(self.repository_dir, tag)
if not os.path.islink(path):
continue
_, md5 = os.path.split(os.readlink(path))
if md5 in instance_repositories:
repositories[tag] = instance_repositories[md5]
else:
repository = Repository(self.name, path, self.stdio)
if repository.hash:
repositories[repository.hash] = repository
return repositories
def search_repository(self, version=None, tag=None, release=None):
path_pattern = os.path.join(self.repository_dir, version or '*', tag or '*')
repository = None
for path in glob(path_pattern):
n_repository = Repository(self.name, path, self.stdio)
if release and release != n_repository.release:
continue
if n_repository.hash and n_repository > repository:
repository = n_repository
return repository
def get_repository(self, version=None, tag=None, release=None):
if version or tag or release:
return self.search_repository(version=version, tag=tag, release=release)
else:
return self.search_repository(tag=self.name) or self.search_repository()
def get_repositories(self, version=None):
if not version:
version = '*'
repositories = []
path_pattern = os.path.join(self.repository_dir, version, '*')
for path in glob(path_pattern):
repository = Repository(self.name, path, self.stdio)
if repository.hash:
repositories.append(repository)
return repositories
class RepositoryManager(Manager):
RELATIVE_PATH = 'repository'
# repository目录结构为 ./repository/{component_name}/{version}/{tag or hash}
def __init__(self, home_path, lock_manager=None, stdio=None):
super(RepositoryManager, self).__init__(home_path, stdio=stdio)
self.repositories = {}
self.component_repositories = {}
self.lock_manager = lock_manager
def _lock(self, read_only=False):
if self.lock_manager:
if read_only:
return self.lock_manager.mirror_and_repo_sh_lock()
else:
return self.lock_manager.mirror_and_repo_ex_lock()
return True
def _get_repository_vo(self, repository):
return RepositoryVO(
repository.name,
repository.version,
repository.release,
repository.arch,
repository.md5,
repository.repository_dir,
[]
)
def get_repositories(self, name, version=None, instance=True):
repositories = []
for repository in self.get_component_repository(name).get_repositories(version):
if instance and repository.is_shadow_repository() is False:
repositories.append(repository)
return repositories
def get_repositories_view(self, name=None):
if name:
repositories = self.get_component_repository(name).get_repositories()
else:
repositories = []
path_pattern = os.path.join(self.path, '*')
for path in glob(path_pattern):
_, name = os.path.split(path)
repositories += self.get_component_repository(name).get_repositories()
repositories_vo = {}
for repository in repositories:
if repository.is_shadow_repository():
repository_ist = self.get_instance_repository_from_shadow(repository)
if repository_ist not in repositories_vo:
repositories_vo[repository_ist] = self._get_repository_vo(repository)
_, tag = os.path.split(repository.repository_dir)
repositories_vo[repository_ist].tags.append(tag)
elif repository not in repositories_vo:
repositories_vo[repository] = self._get_repository_vo(repository)
return list(repositories_vo.values())
def get_component_repository(self, name):
if name not in self.component_repositories:
self._lock(True)
path = os.path.join(self.path, name)
self.component_repositories[name] = ComponentRepository(name, path, self.stdio)
return self.component_repositories[name]
def get_repository(self, name, version=None, tag=None, release=None, package_hash=None, instance=True):
self.stdio.verbose(
"Search repository {name} version: {version}, tag: {tag}, release: {release}, package_hash: {package_hash}".format(
name=name, version=version, tag=tag, release=release, package_hash=package_hash))
tag = tag or package_hash
component_repository = self.get_component_repository(name)
if version and tag:
repository_dir = os.path.join(self.path, name, version, tag)
if repository_dir in self.repositories:
repository = self.repositories[repository_dir]
else:
repository = component_repository.get_repository(version=version, tag=tag, release=release)
else:
repository = component_repository.get_repository(version=version, tag=tag, release=release)
if not repository:
return None
else:
if repository.repository_dir not in self.repositories:
self.repositories[repository.repository_dir] = repository
else:
repository = self.repositories[repository.repository_dir]
if not self._check_repository_pattern(repository, version=version, release=release, hash=package_hash):
return None
self.stdio.verbose("Found repository {}".format(repository))
return self.get_instance_repository_from_shadow(repository) if instance else repository
def _check_repository_pattern(self, repository, **kwargs):
for key in ["version", "release", "hash"]:
current_value = getattr(repository, key)
if kwargs.get(key) is not None and current_value != kwargs[key]:
self.stdio.verbose("repository {} is {}, but {} is required".format(key, current_value, kwargs[key]))
return False
return True
def create_instance_repository(self, name, version, _hash):
path = os.path.join(self.path, name, version, _hash)
if path not in self.repositories:
self._lock()
self._mkdir(path)
repository = Repository(name, path, self.stdio)
self.repositories[path] = repository
return self.repositories[path]
def get_repository_allow_shadow(self, name, version, tag=None):
path = os.path.join(self.path, name, version, tag if tag else name)
if os.path.exists(path):
if path not in self.repositories:
self._lock(True)
self.repositories[path] = Repository(name, path, self.stdio)
return self.repositories[path]
repository = Repository(name, path, self.stdio)
repository.set_version(version)
return repository
def create_tag_for_repository(self, repository, tag, force=False):
if repository.is_shadow_repository():
return False
self._lock()
path = os.path.join(self.path, repository.name, repository.version, tag)
if os.path.exists(path):
if not os.path.islink(path):
return False
src_path = os.readlink(path)
if os.path.normcase(src_path) == os.path.normcase(repository.repository_dir):
return True
if not force:
return False
DirectoryUtil.rm(path)
try:
os.symlink(repository.repository_dir, path)
return True
except:
pass
return False
def get_instance_repository_from_shadow(self, repository):
if not isinstance(repository, Repository) or not repository.is_shadow_repository():
return repository
self._lock(True)
try:
path = os.readlink(repository.repository_dir)
if path not in self.repositories:
self.repositories[path] = Repository(repository.name, path, self.stdio)
return self.repositories[path]
except:
pass
return None
Python
1
https://gitee.com/oceanbase/obdeploy.git
git@gitee.com:oceanbase/obdeploy.git
oceanbase
obdeploy
obdeploy
master

搜索帮助