Skip to content
Snippets Groups Projects
Commit 71d50faa authored by Gerhard Gonter's avatar Gerhard Gonter :speech_balloon:
Browse files

new experiments with replication policy

parent 2c90d6ef
No related branches found
No related tags found
No related merge requests found
......@@ -657,7 +657,7 @@ sub remove_from_store
{
my ($id_str, $path)= @$item;
print "drop: key=[$id_str] store=[$store] path=[$path]\n";
$_cat->remove_one ( { 'key' => $id_str, 'type' => $objreg->{'key'},
$_cat->remove ( { 'key' => $id_str, 'type' => $objreg->{'key'},
'store' => $store, 'path' => $path } );
}
return {}; # TODO: TA::Hasher variant returns dropped items
......@@ -767,11 +767,50 @@ sub check_policy
print __LINE__, " item_count: $item_count\n";
# print __LINE__, " items: ", main::Dumper (\%items);
my %copy;
my $cnt_copy= 0;
foreach my $kv (keys %items)
{
my $item= $items{$kv};
check_replication_policy ($replica_map, $item);
my $res= check_replication_policy ($replica_map, $item);
print __LINE__, " res: ", main::Dumper ($res);
if (defined ($res->{prefered_replica_set}))
{
foreach my $diag (@{$res->{diag}})
{
my $code= $diag->{diag};
if ($code eq 'ok') {} # NOP
elsif ($code eq 'replica_set_incomplete')
{
# TODO: find the optimal source, e.g. one, that is currently readable!
my @sources= sort keys %{$diag->{available}};
my $s_repo= $sources[0];
my $s_path= $diag->{available}->{$s_repo}->[0];
print __LINE__, " copy [$s_repo] [$s_path] to [", join (', ', @{$diag->{missing}}), "]\n";
# $copy{$s_repo}->{$s_path}= $diag->{missing};
foreach my $m (@{$diag->{missing}})
{
push (@{$copy{$s_repo}->{$m}}, $s_path);
}
}
else
{
print "diag code unknown: [$code]\n";
}
}
}
}
print __LINE__, " check_policy: caller= ", join (' ', caller()), "\n";
(\%copy);
}
sub get_replica_map
......@@ -815,7 +854,7 @@ sub check_replication_policy
my @paths= $stores->{$store};
if (@paths > 1)
{
push (@diag, [ 'store_duplicate', $store ]);
push (@diag, { diag => 'store_duplicate', store => $store });
}
if (exists ($map->{stores}->{$store}))
......@@ -825,14 +864,14 @@ sub check_replication_policy
}
else
{
push (@diag, [ 'store_not_in_replica_set', $store ]);
push (@diag, { diag => 'store_not_in_replica_set', store => $store });
}
}
my $prefered_replica_set;
my @replica_sets= sort keys %replica_sets;
if (@replica_sets == 0) { push (@diag, [ 'no_replica_set' ]); }
elsif (@replica_sets > 1) { push (@diag, [ 'multiple_replica_sets', join (' ', @replica_sets) ]); }
if (@replica_sets == 0) { push (@diag, { diag => 'no_replica_set' }); }
elsif (@replica_sets > 1) { push (@diag, { diag => 'multiple_replica_sets', replica_sets => \@replica_sets }); }
elsif (@replica_sets == 1)
{ # NOTE: the object is in one replica set, now check, if it is present in all stores;
......@@ -849,31 +888,55 @@ sub check_replication_policy
if ($replica_sets{$prefered_replica_set} != $map->{store_count}->{$prefered_replica_set})
{
my @missing;
my (@missing, %available);
my $ms= $map->{stores};
foreach my $store (sort keys %$ms)
{
push (@missing, $store) if ($ms->{$store} eq $prefered_replica_set && !exists ($stores->{$store}));
if ($ms->{$store} eq $prefered_replica_set)
{
if (exists ($stores->{$store}))
{
$available{$store}= $stores->{$store};
}
else
{
push (@missing, $store);
}
}
}
push (@diag, [ 'replica_set_incomplete', join (' ', @missing) ]);
push (@diag, { diag => 'replica_set_incomplete', missing => \@missing, available => \%available });
}
}
# NOTE: there are different kings of problems:
# * an object with too few replicas in a given replica set is a problem
# NOTE: there are different kinds of problems:
# * an object with too few replicas in a given replica set is a problem that should be fixed
# * an object with all replicas but with extra copies is not really a problem, but should be noted
if (@diag)
{
=begin comment
print "ATTN: replication policy problem; prefered_replica_set=[$prefered_replica_set]\n";
print join ("\n", map { 'NOTE: ' . join (' ', @$_) } @diag), "\n";
print join ("\n", map { 'NOTE: ' . join (' ', %$_) } @diag), "\n";
print "caller: ", join (' ', caller()), "\n";
print __LINE__, ' item: ', main::Dumper ($item);
=end comment
=cut
}
else
{
push (@diag, { diag => 'ok', line => __LINE__ });
}
return
{
prefered_replica_set => $prefered_replica_set,
diag => \@diag,
item => $item,
}
}
......
......@@ -271,7 +271,23 @@ elsif ($op_mode eq 'policy')
# my $catalog= $objreg->{'cfg'}->{'catalog'};
# &usage ('no catalog found in config') unless (defined ($catalog));
$objreg->check_policy ();
my ($copy)= $objreg->check_policy ();
print __LINE__, " copy: ", main::Dumper ($copy);
foreach my $s_repo (sort keys %$copy)
{
my $s= $copy->{$s_repo};
foreach my $d_repo (sort keys %$s)
{
my @s_paths= sort @{$s->{$d_repo}};
if (open (FO_COPY, '>:utf8', join ('_', '@copy', $s_repo, 'to', $d_repo)))
{
foreach my $p (@s_paths) { print FO_COPY $p, "\0" }
close (FO_COPY);
}
}
}
}
# print "objreg: (after refresh)", Dumper ($objreg);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment