diff --git a/eprints1.pl b/eprints1.pl index 313470d8e3b12749f012d8a912f7aa00f62e76ce..6043471a15180f1ff4055a4d271b4b9982661673 100755 --- a/eprints1.pl +++ b/eprints1.pl @@ -168,6 +168,7 @@ my $db_name; my $no_doi= 0; my $ignore_errors= 0; my $ot2ut_eprint_status= 'archive'; +my $silent_upload_success= 0; if ($0 eq './ot2ut.pl') { $op_mode= 'ot2ut'; $MAX_SYNC= 1; $do_upload= 1; } @@ -246,6 +247,13 @@ my $sleep_urn_request= 3; my $running= 1; $SIG{INT}= sub { $running= 0; }; +# Agent mode +my $db_ot2ut; +my $col_msg; +my $col_activity; +my $last_activity= 0; +my $activity_period= 300; + my $cnf= Util::JSON::read_json_file ($config_fnm); if ($op_mode eq 'connect') @@ -2449,13 +2457,14 @@ Othes Migration Agent: listen to requests in MongoDB and perform them sub oma { - my $ot2ut= IRMA::db::get_any_db($cnf, 'ot2ut_database'); + $db_ot2ut= IRMA::db::get_any_db($cnf, 'ot2ut_database') unless (defined ($db_ot2ut)); + + my $col_sync= $db_ot2ut->get_collection('sync'); + my $col_req= $db_ot2ut->get_collection('requests'); - my $col_sync= $ot2ut->get_collection('sync'); - my $col_msg= $ot2ut->get_collection('messages'); - my $col_req= $ot2ut->get_collection('requests'); + activity({ activity => 'listening' }); - # send_message($col_msg, "oma is listening..."); + # send_message("oma is listening..."); REQ: while($running) { my $row= $col_req->find_one({ agent => 'oma', status => 'new' }); @@ -2463,6 +2472,7 @@ sub oma { print __LINE__, " oma sleeping until ", scalar localtime(time()+$oma_sleep_time), "\n"; sleep ($oma_sleep_time); + activity({ activity => 'sleeping'}) if ($last_activity + $activity_period <= time()); next REQ; } @@ -2483,9 +2493,12 @@ sub oma $col_req->update({ _id => $row->{_id}}, { '$set' => { status => 'in_progress' }}); - send_message($col_msg, "send_batch: sending $bs objects in $ot2ut_eprint_status to $ot2ut_context"); + my $msg= "send_batch: sending $bs objects in $ot2ut_eprint_status to $ot2ut_context"; + activity({ activity => 'send_batch', msg => $msg}); + send_message($msg); + my ($synced, $res)= ot2ut(); - send_message($col_msg, "send_batch: $res"); + send_message("send_batch: $res"); $new_status= 'done' if (@$synced); } @@ -2494,21 +2507,28 @@ sub oma $ignore_errors= 1; $col_req->update({ _id => $row->{_id}}, { '$set' => { status => 'in_progress' }}); my $cnt= @{$row->{ids}}; - send_message($col_msg, "send_ids: sending $cnt objects to $ot2ut_context"); + + my $msg= "send_ids: sending $cnt objects to $ot2ut_context"; + activity({ activity => 'send_batch', msg => $msg}); + send_message($msg); + my ($synced, $res)= ot2ut(@{$row->{ids}}); - send_message($col_msg, "send_ids: $res"); + send_message("send_ids: $res"); $new_status= 'done' if (@$synced); } $col_req->update({ _id => $row->{_id}}, { '$set' => { status => $new_status }}); + activity({ activity => 'listening'}); } } sub send_message { - my $col_msg= shift or return undef; my $text= shift or return undef; + $col_msg= $db_ot2ut->get_collection('messages') unless (defined ($col_msg)); + return undef unless (defined ($col_msg)); + print __LINE__, " sending message [$text]\n"; my $msg= { @@ -2520,6 +2540,25 @@ sub send_message $col_msg->insert($msg); } +sub activity +{ + my $data= shift; + + unless (defined ($col_activity)) + { + $col_activity= $db_ot2ut->get_collection('activity'); + print __LINE__, " starting activity\n"; + } + return undef unless (defined ($col_activity)); + + $data->{agent}= 'oma' unless (exists($data->{agent})); + $data->{e}= time() unless (exists($data->{e})); + $data->{ts_gmt}= Util::ts::ts_ISO_gmt() unless (exists($data->{ts_gmt})); + + $col_activity->update( { agent => 'oma' }, $data, { upsert => 1 } ); + $last_activity= time(); +} + sub ot2ut { my @eprint_ids= @_; @@ -2528,9 +2567,9 @@ sub ot2ut # my $irma_na= get_irma_na_db($cnf); my $epr= get_eprints_db($cnf); - my $ot2ut= IRMA::db::get_any_db($cnf, 'ot2ut_database'); - my $col_sync= $ot2ut->get_collection('sync'); - my $col_msg= $ot2ut->get_collection('messages'); + $db_ot2ut= IRMA::db::get_any_db($cnf, 'ot2ut_database') unless (defined ($db_ot2ut)); + my $col_sync= $db_ot2ut->get_collection('sync'); + my $col_policy_utheses= $db_ot2ut->get_collection('policy.utheses'); unless (defined ($utheses_faculty_map)) { @@ -2540,7 +2579,6 @@ sub ot2ut $utheses_faculty_map= \%utheses_faculty_map; # Util::JSON::write_json_file('@facultymap.json', $utheses_faculty_map); - # $Data::Dumper::Sortkeys= 1; # print __LINE__, " utheses_faculty_map: ", Dumper($utheses_faculty_map); exit; } @@ -2551,7 +2589,9 @@ sub ot2ut if ($ot2ut_eprint_status eq 'archive') { - $res1= $epr->fetch_data('archive', { doi => 1 }); + my @fetch_pars; + push (@fetch_pars, { doi => 1 }) unless ($no_doi); + $res1= $epr->fetch_data('archive', @fetch_pars); } elsif ($ot2ut_eprint_status eq 'buffer') { @@ -2575,11 +2615,16 @@ sub ot2ut my @synced= (); my $cnt_synced= 0; my $cnt_errors= 0; - print __LINE__, " MAX_SYNC=$MAX_SYNC\n"; + my $cnt_eprint_ids= @eprint_ids; + print __LINE__, " ot2ut: ot2ut_eprint_status=$ot2ut_eprint_status no_doi=$no_doi cnt_eprint_ids=$cnt_eprint_ids MAX_SYNC=$MAX_SYNC\n"; + sleep(3); foreach my $eprint_id (@eprint_ids) { + last if (!$running); last if (defined ($MAX_SYNC) && $cnt_synced >= $MAX_SYNC); + activity({ activity => 'ot2ut'}) if ($last_activity + $activity_period <= time()); + my $t_start= time(); my $sync_info= $col_sync->find_one({ eprint_id => $eprint_id, context => $ot2ut_context }); @@ -2651,7 +2696,7 @@ sub ot2ut push (@synced, $el); $col_sync->insert($el); - send_message($col_msg, "upload error: eprint_id=[$eprint_id] eprint_status=[$eprint_status] lastmod=[$lastmod] [conversion errors]"); + send_message("upload error: eprint_id=[$eprint_id] eprint_status=[$eprint_status] lastmod=[$lastmod] [conversion errors]"); my $utheses_errors_json_path= 'othes/utheses_json/errors/' . $eprint_id . '.json'; Util::JSON::write_json_file($utheses_errors_json_path, $errors); @@ -2771,7 +2816,7 @@ old format 2019-11..2020-01 push (@synced, $out_row); $col_sync->insert($out_row); - # send_message($col_msg, "upload success: eprint_id=[$eprint_id] eprint_status=[$eprint_status] lastmod=[$lastmod] context=[$ot2ut_context] utheses_id=[$utheses_id] time_total=$td_start time_upload=$td_curl"); + send_message("upload success: eprint_id=[$eprint_id] eprint_status=[$eprint_status] lastmod=[$lastmod] context=[$ot2ut_context] utheses_id=[$utheses_id] time_total=$td_start time_upload=$td_curl") unless ($silent_upload_success); } sleep(2);