Skip to content
Snippets Groups Projects
Commit ce7906e6 authored by Gerhard Gonter's avatar Gerhard Gonter :speech_balloon:
Browse files

improved agent mode, added activity

parent dcc987a2
No related branches found
No related tags found
No related merge requests found
...@@ -168,6 +168,7 @@ my $db_name; ...@@ -168,6 +168,7 @@ my $db_name;
my $no_doi= 0; my $no_doi= 0;
my $ignore_errors= 0; my $ignore_errors= 0;
my $ot2ut_eprint_status= 'archive'; my $ot2ut_eprint_status= 'archive';
my $silent_upload_success= 0;
if ($0 eq './ot2ut.pl') { $op_mode= 'ot2ut'; $MAX_SYNC= 1; $do_upload= 1; } if ($0 eq './ot2ut.pl') { $op_mode= 'ot2ut'; $MAX_SYNC= 1; $do_upload= 1; }
...@@ -246,6 +247,13 @@ my $sleep_urn_request= 3; ...@@ -246,6 +247,13 @@ my $sleep_urn_request= 3;
my $running= 1; my $running= 1;
$SIG{INT}= sub { $running= 0; }; $SIG{INT}= sub { $running= 0; };
# Agent mode
my $db_ot2ut;
my $col_msg;
my $col_activity;
my $last_activity= 0;
my $activity_period= 300;
my $cnf= Util::JSON::read_json_file ($config_fnm); my $cnf= Util::JSON::read_json_file ($config_fnm);
if ($op_mode eq 'connect') if ($op_mode eq 'connect')
...@@ -2449,13 +2457,14 @@ Othes Migration Agent: listen to requests in MongoDB and perform them ...@@ -2449,13 +2457,14 @@ Othes Migration Agent: listen to requests in MongoDB and perform them
sub oma sub oma
{ {
my $ot2ut= IRMA::db::get_any_db($cnf, 'ot2ut_database'); $db_ot2ut= IRMA::db::get_any_db($cnf, 'ot2ut_database') unless (defined ($db_ot2ut));
my $col_sync= $db_ot2ut->get_collection('sync');
my $col_req= $db_ot2ut->get_collection('requests');
my $col_sync= $ot2ut->get_collection('sync'); activity({ activity => 'listening' });
my $col_msg= $ot2ut->get_collection('messages');
my $col_req= $ot2ut->get_collection('requests');
# send_message($col_msg, "oma is listening..."); # send_message("oma is listening...");
REQ: while($running) REQ: while($running)
{ {
my $row= $col_req->find_one({ agent => 'oma', status => 'new' }); my $row= $col_req->find_one({ agent => 'oma', status => 'new' });
...@@ -2463,6 +2472,7 @@ sub oma ...@@ -2463,6 +2472,7 @@ sub oma
{ {
print __LINE__, " oma sleeping until ", scalar localtime(time()+$oma_sleep_time), "\n"; print __LINE__, " oma sleeping until ", scalar localtime(time()+$oma_sleep_time), "\n";
sleep ($oma_sleep_time); sleep ($oma_sleep_time);
activity({ activity => 'sleeping'}) if ($last_activity + $activity_period <= time());
next REQ; next REQ;
} }
...@@ -2483,9 +2493,12 @@ sub oma ...@@ -2483,9 +2493,12 @@ sub oma
$col_req->update({ _id => $row->{_id}}, { '$set' => { status => 'in_progress' }}); $col_req->update({ _id => $row->{_id}}, { '$set' => { status => 'in_progress' }});
send_message($col_msg, "send_batch: sending $bs objects in $ot2ut_eprint_status to $ot2ut_context"); my $msg= "send_batch: sending $bs objects in $ot2ut_eprint_status to $ot2ut_context";
activity({ activity => 'send_batch', msg => $msg});
send_message($msg);
my ($synced, $res)= ot2ut(); my ($synced, $res)= ot2ut();
send_message($col_msg, "send_batch: $res"); send_message("send_batch: $res");
$new_status= 'done' if (@$synced); $new_status= 'done' if (@$synced);
} }
...@@ -2494,21 +2507,28 @@ sub oma ...@@ -2494,21 +2507,28 @@ sub oma
$ignore_errors= 1; $ignore_errors= 1;
$col_req->update({ _id => $row->{_id}}, { '$set' => { status => 'in_progress' }}); $col_req->update({ _id => $row->{_id}}, { '$set' => { status => 'in_progress' }});
my $cnt= @{$row->{ids}}; my $cnt= @{$row->{ids}};
send_message($col_msg, "send_ids: sending $cnt objects to $ot2ut_context");
my $msg= "send_ids: sending $cnt objects to $ot2ut_context";
activity({ activity => 'send_batch', msg => $msg});
send_message($msg);
my ($synced, $res)= ot2ut(@{$row->{ids}}); my ($synced, $res)= ot2ut(@{$row->{ids}});
send_message($col_msg, "send_ids: $res"); send_message("send_ids: $res");
$new_status= 'done' if (@$synced); $new_status= 'done' if (@$synced);
} }
$col_req->update({ _id => $row->{_id}}, { '$set' => { status => $new_status }}); $col_req->update({ _id => $row->{_id}}, { '$set' => { status => $new_status }});
activity({ activity => 'listening'});
} }
} }
sub send_message sub send_message
{ {
my $col_msg= shift or return undef;
my $text= shift or return undef; my $text= shift or return undef;
$col_msg= $db_ot2ut->get_collection('messages') unless (defined ($col_msg));
return undef unless (defined ($col_msg));
print __LINE__, " sending message [$text]\n"; print __LINE__, " sending message [$text]\n";
my $msg= my $msg=
{ {
...@@ -2520,6 +2540,25 @@ sub send_message ...@@ -2520,6 +2540,25 @@ sub send_message
$col_msg->insert($msg); $col_msg->insert($msg);
} }
sub activity
{
my $data= shift;
unless (defined ($col_activity))
{
$col_activity= $db_ot2ut->get_collection('activity');
print __LINE__, " starting activity\n";
}
return undef unless (defined ($col_activity));
$data->{agent}= 'oma' unless (exists($data->{agent}));
$data->{e}= time() unless (exists($data->{e}));
$data->{ts_gmt}= Util::ts::ts_ISO_gmt() unless (exists($data->{ts_gmt}));
$col_activity->update( { agent => 'oma' }, $data, { upsert => 1 } );
$last_activity= time();
}
sub ot2ut sub ot2ut
{ {
my @eprint_ids= @_; my @eprint_ids= @_;
...@@ -2528,9 +2567,9 @@ sub ot2ut ...@@ -2528,9 +2567,9 @@ sub ot2ut
# my $irma_na= get_irma_na_db($cnf); # my $irma_na= get_irma_na_db($cnf);
my $epr= get_eprints_db($cnf); my $epr= get_eprints_db($cnf);
my $ot2ut= IRMA::db::get_any_db($cnf, 'ot2ut_database'); $db_ot2ut= IRMA::db::get_any_db($cnf, 'ot2ut_database') unless (defined ($db_ot2ut));
my $col_sync= $ot2ut->get_collection('sync'); my $col_sync= $db_ot2ut->get_collection('sync');
my $col_msg= $ot2ut->get_collection('messages'); my $col_policy_utheses= $db_ot2ut->get_collection('policy.utheses');
unless (defined ($utheses_faculty_map)) unless (defined ($utheses_faculty_map))
{ {
...@@ -2540,7 +2579,6 @@ sub ot2ut ...@@ -2540,7 +2579,6 @@ sub ot2ut
$utheses_faculty_map= \%utheses_faculty_map; $utheses_faculty_map= \%utheses_faculty_map;
# Util::JSON::write_json_file('@facultymap.json', $utheses_faculty_map); # Util::JSON::write_json_file('@facultymap.json', $utheses_faculty_map);
# $Data::Dumper::Sortkeys= 1;
# print __LINE__, " utheses_faculty_map: ", Dumper($utheses_faculty_map); exit; # print __LINE__, " utheses_faculty_map: ", Dumper($utheses_faculty_map); exit;
} }
...@@ -2551,7 +2589,9 @@ sub ot2ut ...@@ -2551,7 +2589,9 @@ sub ot2ut
if ($ot2ut_eprint_status eq 'archive') if ($ot2ut_eprint_status eq 'archive')
{ {
$res1= $epr->fetch_data('archive', { doi => 1 }); my @fetch_pars;
push (@fetch_pars, { doi => 1 }) unless ($no_doi);
$res1= $epr->fetch_data('archive', @fetch_pars);
} }
elsif ($ot2ut_eprint_status eq 'buffer') elsif ($ot2ut_eprint_status eq 'buffer')
{ {
...@@ -2575,11 +2615,16 @@ sub ot2ut ...@@ -2575,11 +2615,16 @@ sub ot2ut
my @synced= (); my @synced= ();
my $cnt_synced= 0; my $cnt_synced= 0;
my $cnt_errors= 0; my $cnt_errors= 0;
print __LINE__, " MAX_SYNC=$MAX_SYNC\n"; my $cnt_eprint_ids= @eprint_ids;
print __LINE__, " ot2ut: ot2ut_eprint_status=$ot2ut_eprint_status no_doi=$no_doi cnt_eprint_ids=$cnt_eprint_ids MAX_SYNC=$MAX_SYNC\n";
sleep(3);
foreach my $eprint_id (@eprint_ids) foreach my $eprint_id (@eprint_ids)
{ {
last if (!$running);
last if (defined ($MAX_SYNC) && $cnt_synced >= $MAX_SYNC); last if (defined ($MAX_SYNC) && $cnt_synced >= $MAX_SYNC);
activity({ activity => 'ot2ut'}) if ($last_activity + $activity_period <= time());
my $t_start= time(); my $t_start= time();
my $sync_info= $col_sync->find_one({ eprint_id => $eprint_id, context => $ot2ut_context }); my $sync_info= $col_sync->find_one({ eprint_id => $eprint_id, context => $ot2ut_context });
...@@ -2651,7 +2696,7 @@ sub ot2ut ...@@ -2651,7 +2696,7 @@ sub ot2ut
push (@synced, $el); push (@synced, $el);
$col_sync->insert($el); $col_sync->insert($el);
send_message($col_msg, "upload error: eprint_id=[$eprint_id] eprint_status=[$eprint_status] lastmod=[$lastmod] [conversion errors]"); send_message("upload error: eprint_id=[$eprint_id] eprint_status=[$eprint_status] lastmod=[$lastmod] [conversion errors]");
my $utheses_errors_json_path= 'othes/utheses_json/errors/' . $eprint_id . '.json'; my $utheses_errors_json_path= 'othes/utheses_json/errors/' . $eprint_id . '.json';
Util::JSON::write_json_file($utheses_errors_json_path, $errors); Util::JSON::write_json_file($utheses_errors_json_path, $errors);
...@@ -2771,7 +2816,7 @@ old format 2019-11..2020-01 ...@@ -2771,7 +2816,7 @@ old format 2019-11..2020-01
push (@synced, $out_row); push (@synced, $out_row);
$col_sync->insert($out_row); $col_sync->insert($out_row);
# send_message($col_msg, "upload success: eprint_id=[$eprint_id] eprint_status=[$eprint_status] lastmod=[$lastmod] context=[$ot2ut_context] utheses_id=[$utheses_id] time_total=$td_start time_upload=$td_curl"); send_message("upload success: eprint_id=[$eprint_id] eprint_status=[$eprint_status] lastmod=[$lastmod] context=[$ot2ut_context] utheses_id=[$utheses_id] time_total=$td_start time_upload=$td_curl") unless ($silent_upload_success);
} }
sleep(2); sleep(2);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment