1 Star 1 Fork 3

yangyi336 / gpmagic

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
gpddlrestore 10.40 KB
一键复制 编辑 原始数据 按行查看 历史
water32 提交于 2019-08-25 09:52 . Add files via upload
#!/usr/bin/perl
use strict;
use MIME::Base64;
use threads;
use Thread::Queue;
use Getopt::Long qw(:config bundling no_ignore_case no_autoabbrev passthrough);
use IO::Handle qw();
use Fcntl qw(:flock);
use POSIX;
use Time::HiRes qw(usleep nanosleep);
my ($DATABASE_NAME,$PORT,$FROM_FILE,$OBJECT_BIT,$BATCH_SIZE,$IS_HELP,$VERSION);
my %BATCH_REGION_HASH = ("Language" => "","Schema" => "","Type" => "","Function" => "","Sequence" => "","Table" => "","ExtTable" => "","View" => "","Index" => "","Comment" => "");
my $REGION_TYPE;
my %OBJECT_BIT_HASH = ("ResourceQueue" => 8192,"Role" => 4096,"TableSpace" => 2048,"RoleSetting" => 1024,"Language" => 512, "Schema" => 256, "Type" => 128, "Function" => 64,
"Sequence" => 32,"Table" => 16,"ExtTable" => 8,"View" => 4,"Index" => 2,"Comment" => 1);
my ($DEFAULT_BATCH,$SQL_BATCH_SIZE) = (1,1);
my %OBJECT_BATCH_HASH = ("ResourceQueue" => 16,"Role" => 16,"TableSpace" => 16,"RoleSetting" => 16,"Language" => 4, "Schema" => 4, "Type" => 4, "Function" => 16,
"Sequence" => 16,"Table" => 64,"ExtTable" => 64,"View" => 64,"Index" => 1,"Comment" => 128);
my ($LOCK_FILE_HANDLE);
my ($FILE_HANDLE,$TASK_QUEUE,$MSGE_QUEUE);
my (@TASK_THREAD,$MSGE_THREAD);
my ($BATCH_MAX,$BATCH_DEFAULT,$BATCH_MIN,$CURR_BATCH_SIZE) = (128,32,8);
my ($CMD_SPLIT,$SQL_DELIM,$RECORD_SPLIT) = (chr(1).chr(0).chr(2).chr(7),chr(3).chr(4).chr(8),chr(5).chr(6).chr(9).chr(10));
my $REGION_START = qq#--REGION-START-#.$CMD_SPLIT;
my $REGION_END = qq#--REGION-END---#.$CMD_SPLIT;
my $TASK_SPLIT = qq#--TASK---SPLIT-#.$CMD_SPLIT;
(my $CMD_NAME = $0) =~ s!.*/(.*)!$1!;
my $MAIN_PID = substr("000000".$$,-6);
my $HELP_MESSAGE = qq#COMMAND NAME: $CMD_NAME
Developed by Miao Chen
Work Email:
michen\@pivotal.io
Private Email:
miaochen\@mail.ustc.edu.cn
************************************************************************************************
SYNOPSIS
************************************************************************************************
$CMD_NAME [--database database] [--port port] [-f filename] [--object-bit hex string]
[-B batch_size] [-h|--help] [--version]
*****************************************************
OPTIONS
*****************************************************
--database <Database name>
Restore ddl to this database.
If not specified, use \$PGDATABSE
--port <master port>
Database port number, If not specified, use \$PGPORT or the default is 5432.
eg.
--port 5433
-f <filename>
Input ddl from this file.
--object-bit <hex string>
Match which object types will be restored, the default is FFFF.
FFFF means all objects
2000 means ResourceQueue
1000 means Role
0800 means TableSpace
0400 means RoleSetting
0200 means Language
0100 means Schema
0080 means Type
0040 means Function
0020 means Sequence
0010 means Table
0008 means ExtTable
0004 means View
0002 means Index
0001 means Comment
example:
--object-bit FFFF means all global objects and in database objects
--object-bit FC00 means all global objects
--object-bit 03FF means all in database objects
--object-bit 03FD means all in database objects exclude index
--object-bit 03FC means all in database objects exclude index and comment
--object-bit 0002 means in database only index objects
--object-bit 0001 means in database only comment objects
-B <batch_size>
Sets the maximum number of querys that $CMD_NAME concurrently restore to database.
If not specified, the default is $BATCH_DEFAULT.
The max is $BATCH_MAX.
The min is $BATCH_MIN.
-h|--help
Displays the online help.
--version
Displays the command version.
example:
$CMD_NAME --database postgres --port 5432 -f postgres.ddl
$CMD_NAME -h | --help
#;
sub printMessage{
my ($flag,$message) = @_;
my $time_flag = strftime("%Y%m%d:%H:%M:%S.",localtime).$MAIN_PID;
$message = "$time_flag-[$flag]-:$message\n";
if("ERROR" eq $flag){
print STDERR $message;
}else{
print STDOUT $message;
}
}
sub exitMain{
my ($code) = @_;
exit $code;
}
sub errorMessage{
my ($message) = @_;
printMessage("ERROR",$message);
print "Usage: $CMD_NAME [-h|--help] [options]\n";
exitMain(1);
}
sub trim{
my ($string) = @_;
$string =~ s/(^\s+|\s+$)//g;
return $string;
}
sub output{
print trim($_[0])."\n";
}
sub exequteQuery{
my ($query_sql,$thread_index) = @_;
my $script_file = "/tmp/.gpdbrestore.script.".$thread_index;
if(!open(FILE_HANDLE,">",$script_file)){
errorMessage("Can't open temp file: $script_file");
}
FILE_HANDLE->autoflush(1);
print FILE_HANDLE $query_sql;
close FILE_HANDLE;
my $CMDS = "set -o pipefail;PGDATABASE=$DATABASE_NAME PGPORT=$PORT PGOPTIONS='-c statement_timeout=0 -c client_encoding=UTF8 -c standard_conforming_strings=ON";
$CMDS = $CMDS." -c check_function_bodies=OFF -c client_min_messages=WARNING -c default_with_oids=OFF' ";
local $/ = $RECORD_SPLIT;
$CMDS = $CMDS."psql -R '$/' -tAXF '$SQL_DELIM' 2>&1 -f $script_file |sed -e 's/psql.*gpdbrestore.script.[0-9]*:[0-9]*: //'";
my @result = readpipe($CMDS);
my $return_code = $? >> 8;
chomp(@result);
local $/ = chr(10);
return join("\n",@result);
}
sub getOption{
GetOptions(
'database:s' => \$DATABASE_NAME,
'port:i' => \$PORT,
'f:s' => \$FROM_FILE,
'object-bit:s' => \$OBJECT_BIT,
'B:i' => \$BATCH_SIZE,
'h|help!' => \$IS_HELP,
'version!' => \$VERSION,
);
if(@ARGV != 0){
errorMessage("Some parameters unknown: [@ARGV]\nPlease refer to $CMD_NAME --help");
}
if($IS_HELP){
print $HELP_MESSAGE;
exitMain(0);
}
if($VERSION){
print "$CMD_NAME 1.0\n";
exitMain(0);
}
}
sub checkOption{
if("" eq $DATABASE_NAME){
$DATABASE_NAME = trim($ENV{'PGDATABASE'});
}
if("" eq $DATABASE_NAME){
errorMessage("Please specify parameter: --database");
}
if("" eq $PORT){
$PORT = trim($ENV{'PGPORT'});
}
if("" eq $PORT){
errorMessage("Please specify parameter: --port");
}
if("" eq $BATCH_SIZE || $BATCH_SIZE > $BATCH_MAX || $BATCH_SIZE < $BATCH_MIN){
printMessage("NOTICE","Not specify or out of limit, use default($BATCH_DEFAULT): -B");
$BATCH_SIZE = $BATCH_DEFAULT;
}
if(!-e $FROM_FILE){
errorMessage("No file exists named: $FROM_FILE");
}
$OBJECT_BIT = hex($OBJECT_BIT);
if($OBJECT_BIT < 1){
$OBJECT_BIT = hex("FFFF");
}
}
sub checkConflictProcess{
my $lock_file = "/tmp/.gpddlrestore.lock";
if(!open($LOCK_FILE_HANDLE,">",$lock_file)){
errorMessage("Can't open lock file: $lock_file");
}
my $lockCode = flock($LOCK_FILE_HANDLE, LOCK_EX | LOCK_NB);
if(!$lockCode){
errorMessage("Lock file is in using, you can try again later");
}
}
sub executeRestore{
$SIG{'KILL'} = sub{threads->exit;};
my ($index) = @_;
my $end = 0;
while(1){
my ($task_batch,$task,$tasks) = ("","",0);
while(1){
$task = $TASK_QUEUE->dequeue();
nanosleep(10000);
if("" eq $task){
$end = 1;
last;
}else{
$task_batch = $task_batch."\n".$task;
$tasks += 1;
if($REGION_TYPE eq "Table"){
$tasks += () = ($task =~ /PARTITION/g);
}
if($tasks > $SQL_BATCH_SIZE){
last;
}
}
}
if("" ne $task_batch){
my $msg = exequteQuery($task_batch,$index);
$MSGE_QUEUE->enqueue($msg);
}
if($end){
last;
}
}
$MSGE_QUEUE->enqueue(undef);
}
sub executeMessage{
$SIG{'KILL'} = sub{threads->exit;};
my ($batch_size) = @_;
my ($expect_end) = (0);
my $msg = $MSGE_QUEUE->dequeue();
while(1){
if(defined $msg){
output($msg);
}else{
$expect_end += 1;
if($expect_end eq $CURR_BATCH_SIZE){
last;
}
}
$msg = $MSGE_QUEUE->dequeue();
}
}
sub restoreRegion{
my $object_bit = $OBJECT_BIT_HASH{$REGION_TYPE};
if("" eq $object_bit){
return;
}
unless($object_bit & $OBJECT_BIT){
return;
}
my @task_list = @_;
$TASK_QUEUE = Thread::Queue->new();
$MSGE_QUEUE = Thread::Queue->new();
for my $task(@task_list){
$TASK_QUEUE->enqueue($task);
}
@TASK_THREAD = ();
for my $index(0 .. $CURR_BATCH_SIZE - 1){
$TASK_QUEUE->enqueue(undef);
my $task_thread = threads->new(\&executeRestore,$index);
push @TASK_THREAD,$task_thread;
}
$MSGE_THREAD = threads->new(\&executeMessage,$CURR_BATCH_SIZE);
$MSGE_THREAD->join;
for my $thread(@TASK_THREAD){
$thread->join;
}
}
sub restoreFromFile{
if(!open($FILE_HANDLE,"<",$FROM_FILE)){
errorMessage("Can't open file: $FROM_FILE");
}
my @task_list = ();
my @line_list = ();
while(my $line = <$FILE_HANDLE>){
if($line =~ /^$REGION_START/){
$REGION_TYPE = trim($line);
$REGION_TYPE =~ s/$REGION_START//;
output(qq{Start to restore $REGION_TYPE}.("." x 16));
}elsif($line =~ /^$REGION_END/){
push @task_list,join("",@line_list);
@line_list = ();
if(exists $OBJECT_BATCH_HASH{$REGION_TYPE}){
$SQL_BATCH_SIZE = $OBJECT_BATCH_HASH{$REGION_TYPE};
}else{
$SQL_BATCH_SIZE = $DEFAULT_BATCH;
}
if(exists $BATCH_REGION_HASH{$REGION_TYPE}){
$CURR_BATCH_SIZE = $BATCH_SIZE;
}else{
$CURR_BATCH_SIZE = 1;
}
restoreRegion(@task_list);
@task_list = ();
}elsif($line =~ /^$TASK_SPLIT/){
push @task_list,join("",@line_list);
@line_list = ();
}else{
push @line_list,$line;
}
}
close $FILE_HANDLE;
return @line_list;
}
sub clearTempFile{
my $lock_file = "/tmp/.gpddlrestore.lock";
if($LOCK_FILE_HANDLE){
close $LOCK_FILE_HANDLE;
}
unlink $lock_file;
system(qq{rm -f /tmp/.gpdbrestore.script.*});
}
sub main{
getOption();
checkOption();
checkConflictProcess();
restoreFromFile();
clearTempFile();
}
my $command_string = $0." ".join(" ",@ARGV);
STDOUT->autoflush(1);
STDERR->autoflush(1);
main($command_string);
1
https://gitee.com/yangyi336/gpmagic.git
git@gitee.com:yangyi336/gpmagic.git
yangyi336
gpmagic
gpmagic
master

搜索帮助