1 Star 2 Fork 0

speedreg/etl

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
etl.pl 13.46 KB
一键复制 编辑 原始数据 按行查看 历史
speedreg 提交于 2018-11-16 09:45 . 20181116
#!/usr/bin/perl
use strict;
use warnings;
use File::Spec;
use File::Path qw(mkpath);
use Data::Dumper;
use Encode;
use File::Copy;
use JSON;
use Try::Tiny;
use Net::SFTP::Foreign;
use Fcntl ':mode';
###扫描指定的目录当目录下有文件则启动进程,如无文件且非当日目录则删除
#
my (%config,%logger,%platcfg,%hashtmp,%dbcfg,%tabcfg,%etl,%ftp);
$config{dirs}{path_curf} = File::Spec->rel2abs(__FILE__);
($config{dirs}{scrvol},$config{dirs}{scrdir},$config{dirs}{file_script}) = File::Spec->splitpath($config{dirs}{path_curf});
$config{dirs}{root} = $config{dirs}{scrvol}.$config{dirs}{scrdir};
$config{dirs}{conf} = $config{dirs}{root}."config/";
$config{dirs}{logger} = $config{dirs}{root}."logs/";
$config{dirs}{tmp} = $config{dirs}{root}."files/";
$config{dirs}{class} = $config{dirs}{root}."class/";
#$config{dirs}{scan} = $config{dirs}{tmp}."scanroot/";
#初始化扫描文件目录,后期下面会读配置文件
$config{dirs}{scan} = "/home/xiqing/apps/Projects/Market/file/recv/";
#$config{dirs}{scan} = "/home/xiqing/apps/Projects/Market/file/sendcache/";
$config{files}{lockfile} = $config{dirs}{logger}."run.lock";
$config{files}{exitfile} = $config{dirs}{logger}."run.exit";
try {
if (-e $config{files}{lockfile}){
print "The Process is running.. \r\n";
exit;
}
open(FILE_LOCK, ">$config{files}{lockfile}");
close(FILE_LOCK);
#导入类库
opendir(DIR, "$config{dirs}{class}") || die "Cannot open dir : $config{dirs}{class} ";
foreach my $class_file (readdir(DIR)) {
if(-f "$config{dirs}{class}/$class_file") {
require($config{dirs}{class}.$class_file);
}
}
closedir(DIR);
Process_Start:
if (-e $config{files}{exitfile}){
goto Process_End;
}
# ------------------------------
# &nowsrl()
# 当前流水
# ------------------------------
sub nowsrl {
my ( $s,$min,$h,$d,$m,$y ) = (localtime(time()))[0,1,2,3,4,5];
# my ( $s,$min,$h,$d,$m,$y,$sss ) = (localtime(time()))[0,1,2,3,4,5,6];
$y += 1900;
$m ++;
return sprintf("%4d%02d%02d%02d%02d%02d%02d",$y,$m,$d,$h,$min,$s,$s*1000);
}
$logger{date}{year} = substr(&nowsrl,0,4);
$logger{date}{month} = substr(&nowsrl,0,6);
$logger{date}{date} = substr(&nowsrl,0,8);
$logger{init}{dirs} = $config{dirs}{logger}.$logger{date}{year}.'/'.$logger{date}{month}.'/';
$logger{init}{logfile} = $logger{init}{dirs}.'init_'.$logger{date}{date}.'.log';
if (-e $logger{init}{dirs}) {
} else {
mkpath($logger{init}{dirs},0,0775);
}
&printlog($logger{init}{logfile},&nowtime." Process Starting \r\n");
$config{files}{etl} = $config{dirs}{conf}.'etl.config';
open(FILE,"<",$config{files}{etl})||die"cannot open the file: $! \r\n";
while (my $content = <FILE>){
chomp $content;
if (length($content) == 0 || substr($content,0,1) eq "#") {
next;
} else {
my ($name,$value) = split(/\|/,$content);
$etl{$name} = from_json($value);
}
}
close FILE;
$config{files}{dbconfig} = $config{dirs}{conf}.'db.config';
open(FILE,"<",$config{files}{dbconfig})||die"cannot open the file: $! \r\n";
while (my $content = <FILE>){
chomp $content;
if (length($content) == 0 || substr($content,0,1) eq "#") {
next;
} else {
my ($name,$value) = split(/\|/,$content);
$dbcfg{$name} = from_json($value);
}
}
close FILE;
$config{files}{ftp} = $config{dirs}{conf}.'ftp.config';
open(FILE,"<",$config{files}{ftp})||die"cannot open the file: $! \r\n";
while (my $content = <FILE>){
chomp $content;
if (length($content) == 0 || substr($content,0,1) eq "#") {
next;
} else {
my ($name,$value) = split(/\|/,$content);
$ftp{$name} = from_json($value);
}
}
close FILE;
$config{files}{table} = $config{dirs}{conf}.'table.config';
open(FILE,"<",$config{files}{table})||die"cannot open the file: $! \r\n";
while (my $content = <FILE>){
chomp $content;
if (length($content) == 0 || substr($content,0,1) eq "#") {
next;
} else {
my ($name,$value) = split(/\|/,$content);
$tabcfg{$name} = from_json($value);
}
}
close FILE;
# $config{files}{config} = $config{dirs}{conf}.'platid.config';
# open(FILE,"<",$config{files}{config}) || die "cannot open the file: $config{files}{config} \n";
# while (my $content = <FILE>) {
# chomp $content;
# if ($content) {
# if (substr($content,0,1) eq "#") {
# next;
# } else {
# my %hashtmp;
# ($hashtmp{platid},$hashtmp{platname},$hashtmp{dirs},$hashtmp{runscript},$hashtmp{runlock}) = split(/\|/,$content);
# $platcfg{$hashtmp{platid}} = \%hashtmp;
# }
# }
# }
# close FILE;
#print Dumper(\%platcfg);
$config{dirs}{datafile} = $config{dirs}{tmp}.'datafile/';
$config{dirs}{tabletmp} = $config{dirs}{tmp}.'table/';
&printlog($logger{init}{logfile},&nowtime." init successful \r\n");
&printlog($logger{init}{logfile},&nowtime." Starting ETL ... \r\n");
#循环ETL任务列表
foreach my $key ( keys %etl ) {
my $subhash = $etl{$key};
my %subhash = %$subhash;
my %info;
$subhash{reloaddir} = $config{dirs}{tmp}.'reload/';
&printlog($logger{init}{logfile},&nowtime." etlname: $key ");
if ($subhash{isvalid} eq "1") {
#etl任务有效
&printlog($logger{init}{logfile}," ---> valid:deal,Connect Remote... \r\n");
try {
if ($ftp{$subhash{ftp}}{type} eq "sftp") {
#创建本地文件夹
#$dircfg{hisdirtmp} = $dircfg{hisdir}.substr(&nowsrl,0,4).'/'.substr(&nowsrl,0,6).'/'.substr(&nowsrl,0,8).'/';
$info{localdir} = $config{dirs}{datafile}.substr(&nowsrl,0,4).'/'.substr(&nowsrl,0,6).'/'.substr(&nowsrl,0,8).'/'.$subhash{localdir};
if (-e $info{localdir}) {
} else {
mkpath($info{localdir},0,0775);
}
my $sftp = Net::SFTP::Foreign->new($ftp{$subhash{ftp}}{server},
user => $ftp{$subhash{ftp}}{account},
password => $ftp{$subhash{ftp}}{password},
port => $ftp{$subhash{ftp}}{port},
timeout => 300,
autodie => 1);
#$sftp->die_on_error("unable to connect to remote host");
&printlog($logger{init}{logfile},&nowtime." search: $subhash{remotedir} \r\n");
$sftp->setcwd($subhash{remotedir});
$info{ls} = $sftp->ls("./" ,wanted => sub {
my $entry = $_[1];
#print Dumper($_[1]);
my %fileinfo;
$fileinfo{Rfile} = $_[1]->{filename};
$fileinfo{atime} = $entry->{a}->{mtime};
$fileinfo{ss} = time() - $fileinfo{atime};
if ($subhash{filekey}) {
my @fileana = split(/_/,$fileinfo{Rfile});
$fileinfo{filekey} = $fileana[$subhash{keypos}];
}
#print Dumper($_[1])."\r\n";
if ($fileinfo{ss} > $subhash{scanagin} ) {
if ($subhash{filekey}) {
if ($fileinfo{filekey} eq $subhash{filekey}) {
S_ISREG($entry->{a}->perm)
}
} else {
S_ISREG($entry->{a}->perm)
}
}
#print Dumper(\%fileinfo);
});
&printlog($logger{init}{logfile},&nowtime." find ".@{$info{ls}}." files ...\r\n");
foreach my $key ( @{$info{ls}} ) {
my %subkey = %{$key};
push @{$info{filelist}} , $subkey{filename};
push @{$info{filename}} , $subhash{remotedir}.$subkey{filename};
#print Dumper($subhash{filename})."\r\n";
}
if ($info{filename} && @{$info{filename}} > 0) {
&printlog($logger{init}{logfile},&nowtime." Download ".@{$info{filename}}." files ...\r\n");
try {
$sftp->mget($info{filename},"$info{localdir}");
} catch {
&printlog($logger{init}{logfile},&nowtime." --->error:$_ \r\n");
};
&printlog($logger{init}{logfile},&nowtime." ---> Done \r\n");
}
#check download succ?
if ($info{filelist} && @{$info{filelist}} > 0) {
foreach my $key ( @{$info{filelist}} ) {
my $localfile = $info{localdir}.$key;
my $remotefile = $subhash{remotedir}.$key;
my $errors;
&printlog($logger{init}{logfile},&nowtime." check localfile: $localfile ");
if (-e $localfile){
#goto Process_End;
#组成需要删除的文件列表
#存在的元素重新组合,并删除远程文件,删除文件失败的,则也将本地文件删除,不入库
&printlog($logger{init}{logfile}," ---> sucess \r\n");
if ($remotefile) {
$errors = 0;
&printlog($logger{init}{logfile},&nowtime." delete remote file. $remotefile ..");
$sftp->rremove($remotefile, on_error => sub { $errors++});
if ($errors ==0 ) {
&printlog($logger{init}{logfile}," --->sucess \r\n");
push @{$info{listlocal}} , $localfile;
#定义合并文件
if (! $info{imp}{moveto}) {
$info{imp}{filehandle} = "$subhash{name}"."_".&nowsrl; #文件handle,用于sqlldr命名
$info{imp}{filename} = $info{imp}{filehandle}.'.txt';#文件名
$info{imp}{moveto} = $config{dirs}{tabletmp}.$info{imp}{filename};#完整路径文件
&printlog($logger{init}{logfile},&nowtime." Create Import File: $info{imp}{moveto} \r\n");
}
&printlog($logger{init}{logfile},&nowtime." Append File $key --->>> $info{imp}{moveto}");
# 合并文件
open(FILE_DATA,"<",$localfile) || die "cannot open the file: $localfile \n";
open(FILE_MOVE,">>$info{imp}{moveto}");
while (my $content = <FILE_DATA>){
#$content;
#chomp $content;
#$content =~ s/\s//g;
$content =~ s/[\r\n]//g;
if ($tabcfg{$subhash{table}}{addcolumn}) {
$content = &addcolumn(substr($content,0,10)).$content.'$$'.$info{imp}{filename}."\n";
} else {
$content = $content.'$$'.$info{imp}{filename}."\n";
}
print FILE_MOVE $content ;# 写入文件,主程序须要open FILE_LOG文件句柄
}
close FILE_DATA;
close FILE_MOVE;
&printlog($logger{init}{logfile}," --->>> Succ \r\n");
} else {
&printlog($logger{init}{logfile}," --->failed,delete local file");
unlink($localfile);
&printlog($logger{init}{logfile}," --->done \r\n");
}
}
} else {
&printlog($logger{init}{logfile}," failed \r\n");
#从数据中移出
}
}
}
#print Dumper(\%info)."\r\n";
$sftp->disconnect;
} elsif ($ftp{$subhash{ftp}}{type} eq "ftp") {
}
} catch {
&printlog($logger{init}{logfile},&nowtime." $_ \r\n");
};
#import to database
if ($info{imp}{moveto}) {
try {
my $loadres=1;
my $loadtmp=0;
foreach my $element (@{$subhash{db}})
{
if ($dbcfg{$element}{oratype} eq "oracle") {
$loadtmp = func_sqlldr($info{imp}{filehandle}, $tabcfg{$subhash{table}}, $dbcfg{$element},$logger{init}{logfile}, $config{dirs}{tabletmp}, \%subhash);
} elsif ($dbcfg{$element}{oratype} eq "mysql") {
$loadtmp = func_mysqlldr($info{imp}{filehandle}, $tabcfg{$subhash{table}}, $dbcfg{$element},$logger{init}{logfile}, $config{dirs}{tabletmp}, \%subhash);
}
if ($loadtmp) {
next;
} else {
$loadres = 0;
}
}
if ($loadres) {
#成功导入删除临时文件
&printlog($logger{init}{logfile}, &nowtime." Load All Successful and Remove temp file ");
my $lsfile = $config{dirs}{tabletmp}.$info{imp}{filehandle}.'.txt';
my $lsctl = $lsfile.'.ctl';
my $lslog = $lsfile.'.log';
unlink($lsfile);
unlink($lsctl);
unlink($lslog);
&printlog($logger{init}{logfile}, " Succ !~ \r\n");
} else {
&printlog($logger{init}{logfile}, &nowtime." Load failed and not Remove temp file \r\n");
}
} catch {
&printlog($logger{init}{logfile},&nowtime." import Error $info{imp}{moveto} ... $_ \r\n ");
};
}
} else {
&printlog($logger{init}{logfile}," ---> invalid:Skip \r\n");
}
}
sub callback
{
my($sftp, $data, $offset, $size) = @_;
print "Read $offset / $size bytes\r";
}
#&checkdir($config{dirs}{scan},$logger{init}{logfile},$config{dirs}{tabletmp},\%tabcfg,\%dbcfg);
# &printlog($logger{init}{logfile},&nowtime." Scan Successful \n");
# &printlog($logger{init}{logfile},&nowtime." Check Process \n");
# foreach my $key ( keys %platcfg ) {
# my $subhash = $platcfg{$key};
# my %subhash = %$subhash;
# #print Dumper(\%subhash);
# if ($subhash{filecnt}) {
# &printlog($logger{init}{logfile},&nowtime." $subhash{platname} --> $subhash{platid} --> $subhash{runscript}");
# #判断进程是否启动(判断文件锁)
# my $lockfile = $subhash{dirs}.$subhash{runlock};
# if (-e $lockfile) {
# &printlog($logger{init}{logfile}," runing,skip \n");
# #next;
# } else {
# &printlog($logger{init}{logfile}," not run start... ");
# my $runfile = $subhash{dirs}.$subhash{runscript};
# if (-e $runfile) {
# system("cd $subhash{dirs};nohup $runfile >/dev/null 2>&1 &");
# &printlog($logger{init}{logfile}," Done \n");
# } else {
# &printlog($logger{init}{logfile}," Not Found The Script \n");
# }
# }
# #防止启动过快,runtime还没生成
# sleep 10;
# }
#
# }
#print Dumper(\%platcfg);
&printlog($logger{init}{logfile},&nowtime." Check Process End,and waiting 15mins again \n");
sleep 60*15;
goto Process_Start;
Process_End:
&printlog($logger{init}{logfile},&nowtime." Found Exit Ctrl..Exiting.\r\n");
unlink($config{files}{lockfile});
unlink($config{files}{exitfile});
} catch {
print "Process Error $_ \r\n";
#&printlog($logger{init}{logfile},&nowtime." $_ \r\n");
};
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Perl
1
https://gitee.com/speedreg/etl.git
git@gitee.com:speedreg/etl.git
speedreg
etl
etl
master

搜索帮助