From 1fb302260725ffd8d66c47811f990d8c6ffdba38 Mon Sep 17 00:00:00 2001
From: Gerhard Gonter <ggonter@gmail.com>
Date: Thu, 9 Mar 2023 13:06:45 +0100
Subject: [PATCH] added Net/fanout.pm from ubm repo

---
 modules/util/Net/fanout.pm | 194 +++++++++++++++++++++++++++++++++++++
 1 file changed, 194 insertions(+)
 create mode 100644 modules/util/Net/fanout.pm

diff --git a/modules/util/Net/fanout.pm b/modules/util/Net/fanout.pm
new file mode 100644
index 0000000..36331eb
--- /dev/null
+++ b/modules/util/Net/fanout.pm
@@ -0,0 +1,194 @@
+#!/usr/bin/perl
+
+use strict;
+
+package Net::fanout;
+
+use IO::Socket::INET;
+use FileHandle;
+
+my @config_pars= qw(PeerHost PeerAddr PeerPort Blocking Proto);
+my $MAX_RETRIES= 100;
+
+# debugging
+my $show_dots= 0;
+my $dots= 0;
+
+sub new
+{
+  my $class= shift;
+
+  my $self=
+  {
+    PeerPort => 1986,
+    Blocking => 0,
+    Proto => 'tcp',
+    _connected => 0,
+    _subscribed => {},
+    _backlog => [], # queued messages
+  };
+
+  bless ($self, $class);
+  $self->configure(@_);
+
+  $self;
+}
+
+sub configure
+{
+  my $self= shift;
+
+  my %par= (ref($_[0]) eq '') ? @_ : %{$_[0]};
+  my $connect= 0;
+  foreach my $par (keys %par)
+  {
+    $self->{$par}= $par{$par};
+    $connect= 1 if ($par eq 'PeerHost' || $par eq 'PeerAddr');
+  }
+
+  $self->connect() if ($connect);
+
+  $self;
+}
+
+sub connect
+{
+  my $self= shift;
+
+  my @par= ();
+  foreach my $par (@config_pars)
+  {
+    push (@par, $par => $self->{$par}) if (exists($self->{$par}) && defined ($self->{$par}));
+  }
+
+  $self->{_socket}= my $socket= new IO::Socket::INET (@par);
+
+  if (defined ($socket))
+  {
+    $socket->autoflush(1);
+    $self->{_connected}= 1;
+  }
+
+  $socket;
+}
+
+sub connected
+{
+  my $self= shift;
+
+  $self->{_connected};
+}
+
+sub subscribe
+{
+  my $self= shift;
+  my $channel= shift;
+
+  # print __LINE__, " subscribe channel=[$channel]\n";
+  $self->send("subscribe $channel\n");
+  $self->{_subscribed}->{$channel}= time();
+}
+
+sub unsubscribe
+{
+  my $self= shift;
+  my $channel= shift;
+
+  # print __LINE__, " unsubscribe channel=[$channel]\n";
+  $self->send("unsubscribe $channel\n");
+  delete ($self->{_subscribed}->{$channel});
+}
+
+sub subscribed
+{
+  my $self= shift;
+
+  my @channels= keys %{$self->{_subscribed}};
+  (wantarray) ? @channels : \@channels;
+}
+
+sub receive
+{
+  my $self= shift;
+
+  my $line;
+  if (@{$self->{_backlog}})
+  {
+    $line= shift(@{$self->{_backlog}});
+  }
+  else
+  {
+    my $data;
+    my $rc= $self->{_socket}->recv($data, 4096);
+    print __LINE__, " rc=[$rc]\n" if ($show_dots);
+    if (defined ($data) && $data ne '')
+    {
+      my @data= split("\n", $data);
+      $line= shift(@data);
+      push (@{$self->{_backlog}}, @data) if (@data);
+    }
+  }
+
+  # print __LINE__, " received line=[$line]\n";
+  my ($channel, $msg)= split('!', $line, 2);
+
+  # debugging only:
+  if ($channel eq '')
+  {
+    if ($show_dots)
+    {
+      autoflush STDOUT 1;
+      print '.';
+      if ($dots++ >= 80) { print "\n"; $dots= 0; }
+    }
+  }
+  else
+  {
+    if ($dots) { print "\n"; $dots= 0; }
+  }
+  ($channel, $msg);
+}
+
+sub announce
+{
+  my $self= shift;
+  my $channel= shift;
+  my @messages= shift;
+
+  my $count= 0;
+  foreach my $msg (@messages)
+  {
+    $self->send("announce $channel $msg\n");
+    $count++;
+  }
+
+  $count;
+}
+
+sub send
+{
+  my $self= shift;
+  my $msg= shift;
+
+  my $s= $self->{_socket};
+  $s= $self->connect() unless (defined ($s));
+
+  my $retries= $MAX_RETRIES;
+  RETRY: while ($retries--)
+  {
+    eval { $s->send($msg) };
+    if ($@)
+    {
+      if ($dots) { print "\n"; $dots= 0; }
+      sleep(0.05);
+      next RETRY;
+    }
+    last;
+  }
+  die ("can not send to " . join(' ', map { $_ => $self->{$_} } @config_pars) . ' ' . $@) unless ($retries > 0);
+  # print "sent [$msg]... $retries retries left\n";
+  $retries;
+}
+
+1;
+
-- 
GitLab