diff --git a/eprints1.pl b/eprints1.pl index 1afc5efce59c3212617355172ddaa7eebe567ec7..c13555e68cc889fce874a30b472a61f637e823d3 100755 --- a/eprints1.pl +++ b/eprints1.pl @@ -114,12 +114,13 @@ my $die_nbn_already_defined= 0; # BEGIN OT2UT: Othes to Utheses # my $ot2ut_context= 'ot2ut-entw'; # TODO: parametrize my $ot2ut_context= 'ot2ut-test'; # TODO: parametrize +my $oma_sleep_time= 10; my %map_ot2ut_roles= ( - 'advisers' => [qw(betreuer betreuer_2 betreuer_3)], - 'coadvisers' => [qw(mitbetreuer mitbetreuer_2)], - 'assessors' => [qw(beurteiler_1 beurteiler_2 beurteiler_3)], + 'advisers' => [qw(betreuer betreuer_2 betreuer_3 )], + 'coadvisers' => [qw(mitbetreuer mitbetreuer_2 )], + 'assessors' => [qw( beurteiler_1 beurteiler_2 beurteiler_3 )], ); my %map_ot2ut_thesis_columns= @@ -221,6 +222,7 @@ while (defined ($arg= shift (@ARGV))) elsif ($opt eq 'buffer') { $ot2ut_eprint_status= 'buffer'; $no_doi= 1; } elsif ($opt eq 'no-doi') { $no_doi= defined($val) ? $val : 1; } elsif ($opt eq 'ignore-errors') { $ignore_errors= defined($val) ? $val : 1; } + elsif ($opt eq 'oma' || $opt eq 'PAF') { $op_mode= 'oma'; $do_upload= 1; } else { usage("unknown option $arg"); } } elsif ($arg =~ /^-(.+)/) @@ -342,6 +344,10 @@ elsif ($op_mode eq 'ot2ut') # WIP, started 2019-11-28 { ot2ut(@PARS); } +elsif ($op_mode eq 'oma') +{ + oma(); +} elsif ($op_mode eq 'debug_names') { debug_names(); @@ -2469,6 +2475,78 @@ EOX ($tsv_row, $datacite_xml_path); } +=head2 oma() + +Othes Migration Agent: listen to requests in MongoDB and perform them + +=cut + +sub oma +{ + my $ot2ut= get_any_db($cnf, 'ot2ut_database'); + + my $col_sync= $ot2ut->get_collection('sync'); + my $col_msg= $ot2ut->get_collection('messages'); + my $col_req= $ot2ut->get_collection('requests'); + + # send_message($col_msg, "oma is listening..."); + REQ: while($running) + { + my $row= $col_req->find_one({ agent => 'oma', status => 'new' }); + unless (defined ($row)) + { + print __LINE__, " oma sleeping until ", scalar localtime(time()+$oma_sleep_time), "\n"; + sleep ($oma_sleep_time); + next REQ; + } + + print __LINE__, " oma: row: ", Dumper($row); + + my $new_status= 'failed'; + if ($row->{action} eq 'send_batch') + { + my $bs= $row->{batch_size}; + $bs= 10 unless ($bs > 0 && $bs <= 100); + $MAX_SYNC= $bs; + + my $eprint_status= $row->{eprint_status}; + if ($eprint_status eq 'buffer') { $ot2ut_eprint_status= 'buffer'; $no_doi= 1; $ignore_errors= 1; } + else { $ot2ut_eprint_status= 'archive'; $no_doi= 0; $ignore_errors= 0; } + + $col_req->update({ _id => $row->{_id}}, { '$set' => { status => 'working' }}); + my ($synced, $res)= ot2ut(); + send_message($col_msg, "send_batch: $res"); + $new_status= 'done' if (@$synced); + } + elsif ($row->{action} eq 'send_ids') + { + $ignore_errors= 1; + $col_req->update({ _id => $row->{_id}}, { '$set' => { status => 'working' }}); + my ($synced, $res)= ot2ut(@{$row->{ids}}); + send_message($col_msg, "send_ids: $res"); + $new_status= 'done' if (@$synced); + } + + $col_req->update({ _id => $row->{_id}}, { '$set' => { status => $new_status }}); + } +} + +sub send_message +{ + my $col_msg= shift or return undef; + my $text= shift or return undef; + + print __LINE__, " sending message [$text]\n"; + my $msg= + { + message => $text, + priority => 'normal', + state => 'new', + to => 'oma' + }; + $col_msg->insert($msg); +} + sub ot2ut { my @eprint_ids= @_; @@ -2487,8 +2565,9 @@ sub ot2ut # print __LINE__, " utheses_faculty_list: ", Dumper($utheses_faculty_list); my %utheses_faculty_map= map { $_->{oracle_id} => $_ } @$utheses_faculty_list; $utheses_faculty_map= \%utheses_faculty_map; - Util::JSON::write_json_file('@facultymap.json', $utheses_faculty_map); - $Data::Dumper::Sortkeys= 1; + + # Util::JSON::write_json_file('@facultymap.json', $utheses_faculty_map); + # $Data::Dumper::Sortkeys= 1; # print __LINE__, " utheses_faculty_map: ", Dumper($utheses_faculty_map); exit; } @@ -2533,6 +2612,7 @@ sub ot2ut my $sync_info= $col_sync->find_one({eprint_id => $eprint_id}); my ($errors, $warnings, $row, $lastmod, $ut, $utheses_json_path, $files, $utheses_upload_result_json_path)= generate_utheses_metadata($epr, $eprint_id); + my ($eprint_status)= map { $row->{$_} } qw(eprint_status); print __LINE__, " sync_info=[$sync_info]\n"; if (defined ($sync_info)) @@ -2583,15 +2663,7 @@ sub ot2ut $el->{errors}= $errors; $col_sync->insert($el); - my $msg= - { - message => "upload error: eprint_id=[$eprint_id] lastmod=[$lastmod] [conversion errors]", - priority => 'normal', - state => 'new', - to => 'oma' - }; - $col_msg->insert($msg); - + send_message($col_msg, "upload error: eprint_id=[$eprint_id] eprint_status=[$eprint_status] lastmod=[$lastmod] [conversion errors]"); my $utheses_errors_json_path= 'othes/utheses_json/errors/' . $eprint_id . '.json'; Util::JSON::write_json_file($utheses_errors_json_path, $errors); @@ -2639,7 +2711,7 @@ sub ot2ut push (@synced, $el); $el->{errors}= $errors; $col_sync->insert($el); - sleep(5); + sleep(2); } else { @@ -2677,12 +2749,17 @@ old format 2019-11..2020-01 my ($status, $import_status, $utheses_id, $response_msg, $alerts)= map { $result_data->{$_} } qw(status importStatus uthesesId responseMsg alerts); + my $td_start= time()-$t_start; + my $td_curl= time()-$t_curl; + my $out_row= { eprint_id => $eprint_id, + eprint_status => $eprint_status, lastmod => $lastmod, - ts_upload => $ts_upload, context => $ot2ut_context, + ts_upload => $ts_upload, + td_total => $td_start, error_code => 'ok', error_cnt => 0, utheses_id => $utheses_id, @@ -2692,19 +2769,10 @@ old format 2019-11..2020-01 push (@synced, $out_row); $col_sync->insert($out_row); - my $td_start= time()-$t_start; - my $td_curl= time()-$t_curl; - my $msg= - { - message => "upload success: eprint_id=[$eprint_id] lastmod=[$lastmod] context=[$ot2ut_context] utheses_id=[$utheses_id] time_total=$td_start time_upload=$td_curl", - priority => 'normal', - state => 'new', - to => 'oma' - }; - $col_msg->insert($msg); + send_message($col_msg, "upload success: eprint_id=[$eprint_id] eprint_status=[$eprint_status] lastmod=[$lastmod] context=[$ot2ut_context] utheses_id=[$utheses_id] time_total=$td_start time_upload=$td_curl"); } - sleep(5); + sleep(2); } } @@ -2713,15 +2781,16 @@ old format 2019-11..2020-01 last unless ($running); } - my @columns= qw( eprint_id lastmod ts_upload error_code error_cnt utheses_id container_pid container_result document_pid + my @columns= qw( eprint_id eprint_status lastmod context ts_upload td_total error_code error_cnt utheses_id container_pid container_result document_pid document_result activate_result import_code response_msg import_note ); my $fnm= sprintf('ot2ut_%s.tsv', ts_ISO()); Util::Matrix::save_hash_as_csv(\@columns, \@synced, $fnm, "\t", '', "\n", 1); - print __LINE__, " synced $cnt_synced objects; $cnt_errors objects with errors, see [$fnm]\n"; - \@synced; + my $res= "synced $cnt_synced objects; $cnt_errors objects with errors"; + print __LINE__, " $res, see [$fnm]\n"; + (\@synced, $res); } sub generate_utheses_metadata @@ -3209,6 +3278,11 @@ print __LINE__, " thesis: ", Dumper (\%thesis); (\@errors, \@warnings, \%thesis); } +sub cleanup_name +{ + my $s= shift; +} + sub get_names_for_role { my $row= shift;