1 Star 1 Fork 2

日行一善/ScriptTools

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
rds_backups_oss.py 12.18 KB
一键复制 编辑 原始数据 按行查看 历史
日行一善 提交于 2021-12-07 16:02 +08:00 . update python/rds_backups_oss.py.
#!/usr/bin/python3
#
# 日期:2020/8/15
# 介绍:
# 1.RDS全量+增量备份脚本,全量推荐每周备份,增量推荐每小时备份
# 2.脚本会获取一周前的RDS自动备份的最新脚本
# 3.增量备份会根据上次全备份的时间来获取直到现在的所有binlog,确保可以恢复到任意时间点
# 4.备份开始时会检测是否在OSS中已存在文件,存在会跳过
#
# 适用:python3
# 语言:中文
#
# 执行:python3.6 rds-bak.py allbak|binlog
# 注意:
# 1.使用前先修改配置脚本的一些参数
# 2.备份时会占用一些临时存储用在下载中转,通过内网操作不占用公网带宽但会增大磁盘读写IO
# 3.脚本执行时间过长会导致脚本中断退出,可以重复执行脚本来确保备份完成
# 4.使用前先用pip3安装oss2、aliyun-python-sdk-core、aliyun-python-sdk-rds
# 5.使用前先用yum安装python-devel
import os, json, datetime, oss2, sys, shutil, requests, configparser
from oss2 import SizedFileAdapter, determine_part_size
from oss2.models import PartInfo
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException
from aliyunsdkcore.acs_exception.exceptions import ServerException
from aliyunsdkrds.request.v20140815.DescribeBackupsRequest import DescribeBackupsRequest
from aliyunsdkrds.request.v20140815.DescribeBinlogFilesRequest import DescribeBinlogFilesRequest
#[配置ak、sk、rds所在地域、要备份的rds-id列表、oss的bucket、本地备份目录位置]
aliyun_user_ak = 'xxxxxxx'
aliyun_user_sk = 'xxxxxxxx'
region_id = 'cn-beijing'
rds_list = ['rm-2ze53dsafdsa','rm-2zfsdxsjgd','rm-gdcskuyj787o3r2']
oss_bucket = "db-bak"
rdsbak_dir = "/rdsbak"
#[初始化客户端、初始化认证、组合成内网oss地址、初始化oss-bucket连接]
client = AcsClient(ak=aliyun_user_ak, secret=aliyun_user_sk, region_id=region_id, timeout=300)
auth = oss2.Auth(aliyun_user_ak, aliyun_user_sk)
intranet_addr = "http://oss-" + region_id + "-internal.aliyuncs.com"
bucket = oss2.Bucket(auth, intranet_addr, oss_bucket)
#[现在时间、七天前时间、开始时间、结束时间]
nowtime = datetime.datetime.now()
temp_time = nowtime - datetime.timedelta(days=7)
start_time = temp_time.strftime('%Y-%m-%dT%H:%MZ')
end_time = nowtime.strftime('%Y-%m-%dT%H:%MZ')
#[oss-bucket上一级目录、记录上次全备时oss_dir这个变量的文件、记录上次全备时每个RDS实例备份时间点的文件所在目录、全备份临时下载目录]
oss_dir = nowtime.strftime('%Y-%m-%d-BAK')
oss_tmpfile = rdsbak_dir + "/oss_dir.txt"
rds_tmpdir = rdsbak_dir + "/calculate"
rds_datadir = rdsbak_dir + "/data"
def Oss_Seg(dest_file, sour_file):
'''
功能:
使用oss的sdk分片上传文件到oss。成功则不显示,失败则中断并报错
传参举例:
oss上bucket中文件路径,dest_file = etc/passwd.txt
源文件路径,sour_file = /etc/passwd
全局变量:
bucket
API文档:
https://help.aliyun.com/document_detail/88434.html?spm=a2c4g.11186623.6.910.4d736c40cCg3oF
'''
#[源文件大小、分片大小、上传到oss时的临时id号、不知道]
total_size = os.path.getsize(sour_file)
part_size = determine_part_size(total_size, preferred_size=100 * 1024)
upload_id = bucket.init_multipart_upload(dest_file).upload_id
parts = []
#逐个上传分片
with open(sour_file, 'rb') as fileobj:
part_number = 1
offset = 0
while offset < total_size:
num_to_upload = min(part_size, total_size - offset)
result = bucket.upload_part(dest_file, upload_id, part_number, SizedFileAdapter(fileobj, num_to_upload))
parts.append(PartInfo(part_number, result.etag))
offset += num_to_upload
part_number += 1
bucket.complete_multipart_upload(dest_file, upload_id, parts)
if os.path.exists(sour_file) == True:
os.unlink(sour_file)
def Oss_Put(dest_file, sour_file):
'''
功能:
对要上传的文件做个判断,如果在oss中存在则跳过,不存在则上传并检测是否传输完成
传参举例:
oss上bucket中文件路径,dest_file = etc/passwd.txt
源文件路径,sour_file = /etc/passwd
全局变量:
bucket, rds_datadir
'''
exist = bucket.object_exists(dest_file)
if exist:
print("OSS上的备份" + dest_file + "已经存在,不进行上传操作")
else:
print("准备上传到:" + dest_file)
start_puttime = datetime.datetime.now()
Oss_Seg(dest_file, sour_file)
end_puttime = datetime.datetime.now()
spend_puttime = (end_puttime - start_puttime).seconds
exist = bucket.object_exists(dest_file)
if exist:
print("上传完成,用时: " + str(spend_puttime) + "秒")
def File_Oper(rds_id, file_name, file_downlink):
'''
功能:
根据下载链接将备份文件下载到本地。成功显示下载完成,失败显示下载错误
传参举例:
RDS的ID号,rds_id = rm-2zeb3g8b77in755x8
下载后的文件重命名,file_name = 2020-09-12T06:03:03Z.tar.gz
rds全量/增量备份的下载路径 file_downlink = https://rdsbak-bj-v4.oss-cn-beijing.aliyuncs.com/custins14221519/hins7626831_data_20200912024441_qp.xb
全局变量:
rds_datadir, oss_dir
'''
#[rds实例的存储目录、rds备份的下载文件名称、oss上的文件位置]
file_dirpath = rds_datadir + "/" + rds_id
file_filepath = rds_datadir + "/" + rds_id + "/" + file_name
oss_filepath = oss_dir + "/" + rds_id + "/" + file_name
#下载前先在oss中查看是否已经存在对应文件
exist = bucket.object_exists(oss_filepath)
if exist == 1:
print(file_name + "已经存在,跳过下载和上传")
else:
print("开始下载并保存到" + file_filepath)
Check_Dir(rds_datadir)
if os.path.exists(file_dirpath) != True:
os.makedirs(file_dirpath)
if os.path.exists(file_filepath) == True:
os.unlink(file_filepath)
#分段下载
start_opertime = datetime.datetime.now()
file_downlink = file_downlink.strip("'")
response_data_file = requests.get(file_downlink, stream=True)
with open(file_filepath, 'wb') as f:
for chunk in response_data_file.iter_content(chunk_size=102400):
if chunk:
f.write(chunk)
#做下载的文件效验,然后上传到oss
if os.path.isfile(file_filepath):
end_opertime = datetime.datetime.now()
spend_opertime = (end_opertime - start_opertime).seconds
print("下载文件完成,用时:" + str(spend_opertime) + "秒")
if os.path.getsize(file_filepath) < 204800:
print("下载失败")
else:
Oss_Put(oss_filepath, file_filepath)
def Rds_Allbak(rds_id):
'''
功能:
根据实例id号查询全备份的备份列表
传参举例:
RDS的ID号,rds_id = rm-2zeb3g8b77in755x8
全局变量:
rds_tmpdir
阿里云API:
https://help.aliyun.com/document_detail/26273.html?spm=a2c4g.11186623.6.1733.78b64917CTce0u
'''
#[一些筛选参数,具体查看阿里云API]
request = DescribeBackupsRequest()
request.set_accept_format('json')
request.set_DBInstanceId(rds_id)
request.set_BackupStatus("Success")
request.set_BackupMode("Automated")
request.set_StartTime(start_time)
request.set_EndTime(end_time)
response = client.do_action_with_exception(request)
rds_info = json.loads(response)
#[最近一个全备份的备份开始时间、最近一个全备份的备份结束时间、最近一个全备份的下载路径、rds的文件名]
rds_starttime = rds_info["Items"]["Backup"][0]["BackupStartTime"]
rds_endtime = rds_info["Items"]["Backup"][0]["BackupEndTime"]
rds_downlink = "'%s'" %(rds_info["Items"]["Backup"][0]["BackupDownloadURL"])
rds_name = rds_starttime + ".tar.gz"
#把最新全备份的系统备份时间(非脚本执行时间)写入到文件里,用于binlog读取计算
File_Oper(rds_id, rds_name, rds_downlink)
rds_timefile = rds_tmpdir + "/" + rds_id
str_time = rds_starttime[0:10] + " " + rds_starttime[11:19]
with open(rds_timefile,'w') as rds_file:
rds_file.write(str_time)
def Rds_Binlog(rds_id, date_baktime):
'''
功能:
查询上次全备份到现在这个区段的binlog
传参举例:
RDS的ID号,rds_id = rm-2zeb3g8b77in755x8
上次实例全备份的时间 date_baktime = 2020-09-11 18:45:48
全局变量:
阿里云API:
https://help.aliyun.com/document_detail/26291.html?spm=a2c4g.11186623.6.1738.4d707f41HD2my0
'''
#[备份开始时间、7天后的日期、备份结束时间]
start_binlog = date_baktime.strftime('%Y-%m-%dT%H:%M:%SZ')
date_baktime += datetime.timedelta(days=7)
end_binlog = date_baktime.strftime('%Y-%m-%dT%H:%M:%SZ')
#[筛选参数,具体查看API文档]
request = DescribeBinlogFilesRequest()
request.set_accept_format('json')
request.set_DBInstanceId(rds_id)
request.set_StartTime(start_binlog)
request.set_EndTime(end_binlog)
response = client.do_action_with_exception(request)
rds_binlog = json.loads(response)
#根据增量备份列表循环下载binlog文件并上传到oss上
print("增量备份区段" + start_binlog + " - " + end_binlog )
for binlog_info in rds_binlog['Items']['BinLogFile']:
binlog_downlink = "'%s'" %(binlog_info["DownloadLink"])
binlog_name = binlog_info['LogFileName']
File_Oper(rds_id, binlog_name, binlog_downlink)
def Check_Dir(tmp_dir):
'''
功能:
传入目录名,要是目录不存在则创建
传参举例:
目录名,tmp_dir = /rdsbak
'''
if os.path.exists(tmp_dir) != True:
os.makedirs(tmp_dir)
def Start_Bak(options):
'''
功能:
根据选项进行备份操作
传参举例:
选项参数,options = allbak | binlog
全局变量:
rdsbak_dir, rds_tmpdir, rds_datadir, oss_dir, rds_list,
'''
#检测目录
Check_Dir(rdsbak_dir)
Check_Dir(rds_tmpdir)
Check_Dir(rds_datadir)
global oss_dir
print("")
print(end_time + "------开始备份------")
#先写入当前全备份时的备份目录名,然后开始全备份
if options == "allbak":
with open(oss_tmpfile,'w') as oss_file:
oss_file.write(oss_dir)
for rds_id in rds_list:
print("开始全备份" + rds_id)
Rds_Allbak(rds_id)
print("")
#读取上次全备份的目录名,后续的binlog备份都存进去
elif options == "binlog":
with open(oss_tmpfile,'r') as oss_file:
oss_dir = oss_file.read()
for rds_id in rds_list:
#[记录每个实例的上次全备份时间的文件所在位置]
rds_timefile = rds_tmpdir + "/" + rds_id
if os.path.exists(rds_timefile) == True:
with open(rds_timefile,'r') as rds_file:
rds_baktime = rds_file.read()
else:
print(rds_id + "不存在上次全备份,跳过本次binlog备份")
continue
#[字符串格式的上次全备份时间、date格式的上次全备份时间]
str_baktime = rds_baktime.replace('\n', '').replace('\r', '')
date_baktime = datetime.datetime.strptime(str_baktime,'%Y-%m-%d %H:%M:%S')
print("开始增量备份" + rds_id)
Rds_Binlog(rds_id, date_baktime)
print("")
else:
print("使用 ./rds_data_bak.py allbak|binlog 进行备份")
if __name__ == '__main__':
if len(sys.argv) != 2:
print("使用 ./rds_data_bak.py allbak|binlog 进行备份")
else:
Start_Bak(sys.argv[1])
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/rxys/script-tools.git
git@gitee.com:rxys/script-tools.git
rxys
script-tools
ScriptTools
master

搜索帮助