From 4c384e929e387eee2281ab9b835bca929482a6de Mon Sep 17 00:00:00 2001
From: Marko Mecina <marko.mecina@univie.ac.at>
Date: Fri, 20 Jan 2023 17:51:45 +0100
Subject: [PATCH] improve SQL timeout handling

---
 Ccs/ccs_function_lib.py | 19 +++++++++++--------
 Ccs/pus_datapool.py     | 37 ++++++++++++++++++++++++-------------
 2 files changed, 35 insertions(+), 21 deletions(-)

diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py
index a5636a8..4743168 100644
--- a/Ccs/ccs_function_lib.py
+++ b/Ccs/ccs_function_lib.py
@@ -1567,7 +1567,7 @@ def get_cuctime(tml):
 
 
 def get_pool_rows(pool_name, check_existence=False):
-    dbcon = scoped_session_storage
+    dbcon = scoped_session_storage()
 
     if check_existence:
         check = dbcon.query(DbTelemetryPool).filter(DbTelemetryPool.pool_name == pool_name)
@@ -1597,7 +1597,7 @@ def get_param_values(tmlist=None, hk=None, param=None, last=0, numerical=False,
     if tmlist is None and pool_name is not None:
         tmlist = get_pool_rows(pool_name, check_existence=True)
 
-    dbcon = scoped_session_idb
+    dbcon = scoped_session_idb()
     if hk is None:
         que = 'SELECT plf.plf_name,plf.plf_spid,plf.plf_offby,plf.plf_offbi,pcf.pcf_ptc,pcf.pcf_pfc,pcf.pcf_unit,\
                    pcf.pcf_descr,pid.pid_apid,pid.pid_type,pid.pid_stype,pid.pid_descr,pid.pid_pi1_val from pcf\
@@ -1932,7 +1932,11 @@ def Tcbuild(cmd, *args, sdid=0, ack=None, no_check=False, hack_value=None, sourc
     # params = dbcon.execute(que).fetchall()
     # dbcon.close()
 
-    params = _get_tc_params(cmd)
+    try:
+        params = _get_tc_params(cmd)
+    except SQLOperationalError:
+        scoped_session_idb.close()
+        params = _get_tc_params(cmd)
 
     try:
         st, sst, apid, npars = params[0][:4]
@@ -2026,9 +2030,8 @@ def _get_tc_params(cmd, paf_cal=False):
               'cdf.cdf_cname=ccf.ccf_cname LEFT JOIN cpc ON cpc.cpc_pname=cdf.cdf_pname ' \
               'WHERE BINARY ccf_descr="%s"' % cmd
 
-    dbcon = scoped_session_idb
-    params = dbcon.execute(que).fetchall()
-    dbcon.close()
+    params = scoped_session_idb.execute(que).fetchall()
+    scoped_session_idb.close()
     return params
 
 
@@ -2528,7 +2531,7 @@ def _tcsend_common(tc_bytes, apid, st, sst, sleep=0., pool_name='LIVE', pkt_time
     # More specific Logging format that is compatible with the TST
     log_dict = dict([('st', st),('sst', sst),('ssc', ssc),('apid', apid),('timestamp', t)])
     json_string = '{} {}'.format('#SENT TC', json.dumps(log_dict))
-    logger.info(json_string)
+    logger.debug(json_string)
     # time.sleep(sleep)
     return apid, ssc, t
 
@@ -2630,7 +2633,7 @@ def Tcsend_bytes(tc_bytes, pool_name='LIVE', pmgr_handle=None):
     try:
         pmgr.Functions('tc_send', pool_name, tc_bytes, signature='ssay')
         return True
-    except dbus.DBusException:
+    except (dbus.DBusException, AttributeError):
         logger.error('Failed to send packet of length {} to {}!'.format(len(tc_bytes), pool_name))
         return False
     # logger.debug(msg)
diff --git a/Ccs/pus_datapool.py b/Ccs/pus_datapool.py
index b2d62f6..abbc97c 100644
--- a/Ccs/pus_datapool.py
+++ b/Ccs/pus_datapool.py
@@ -58,6 +58,8 @@ PLM_PKT_PREFIX_TC = packet_config.PLM_PKT_PREFIX_TC
 PLM_PKT_PREFIX_TC_SEND = packet_config.PLM_PKT_PREFIX_TC_SEND
 PLM_PKT_SUFFIX = packet_config.PLM_PKT_SUFFIX
 
+SOCK_TO_LIMIT = 900  # number of tm_recv socket timeouts before SQL session reconnect
+
 communication = {}
 for name in cfg['ccs-dbus_names']:
     communication[name] = 0
@@ -665,7 +667,8 @@ class DatapoolManager:
         # if not pool_name in self.state:
         #     start_new = True
 
-        new_session = self.session_factory_storage
+        new_session = self.session_factory_storage()
+        _tocnt = 0  # timeout counter
         creation_time = round(time.time())
 
         # If no TC Pool has been started start new one
@@ -675,7 +678,6 @@ class DatapoolManager:
                 modification_time=creation_time,
                 protocol=protocol)
             new_session.add(pool_row)
-            # new_session.flush()
             new_session.commit()
             self.trashbytes[pool_name] = 0
             self.state[pool_name] = 1
@@ -807,11 +809,9 @@ class DatapoolManager:
                             tail = b''
                     pkt_size_stream = tail
 
-                # buf = sockfd.recv(self.pckt_size_max)
                 if not buf:
                     break
-                with self.lock:
-                    self.databuflen += len(buf)
+
                 if not drop_rx:
                     if pckt_filter:
                         for pkt in self.extract_pus(buf):
@@ -823,33 +823,44 @@ class DatapoolManager:
                                                                                 checkcrc=False)
                     else:
                         self.decode_tmdump_and_process_packets_internal(buf, process_tm, checkcrc=False)
+
+                    _tocnt = 0
+
+                with self.lock:
+                    self.databuflen += len(buf)
+
             except socket.timeout as e:
                 self.logger.debug('Socket timeout ({}:{})'.format(host, port))
+                # reconnect SQL session handle after x socket timeouts to avoid SQL timeout
+                _tocnt += 1
                 new_session.commit()
-                continue
+                if _tocnt > SOCK_TO_LIMIT:
+                    new_session.close()
+                    pool_row = new_session.query(DbTelemetryPool).filter(DbTelemetryPool.pool_name == pool_name,
+                                                                         DbTelemetryPool.modification_time == creation_time).first()
+                    self.logger.debug('SQL session reconnected, SOCK_SQL_TO_LIMIT reached ({})'.format(SOCK_TO_LIMIT))
+                    _tocnt = 0
             except SQLOperationalError as e:
                 self.logger.warning(e)
                 new_session.close()
                 pool_row = new_session.query(DbTelemetryPool).filter(DbTelemetryPool.pool_name == pool_name,
                                                                      DbTelemetryPool.modification_time == creation_time).first()
+                pkt_size_stream = buf + pkt_size_stream  # re-read buffer in next loop since DB insertion has failed
             except socket.error as e:
                 self.logger.error('Socket error ({}:{})'.format(host, port))
                 self.logger.exception(e)
-                # self.logger.error('ERROR: socket error')
-                self.connections[pool_name]['recording'] = False
                 break
             except struct.error as e:
                 self.logger.error('Lost connection to {}:{}'.format(host, port))
                 self.logger.exception(e)
-                self.connections[pool_name]['recording'] = False
                 break
             except Exception as e:
-                self.logger.error(e)
-                self.connections[pool_name]['recording'] = False
+                self.logger.exception(e)
                 break
-        # if self.state[pool_row.pool_name] % 10 != 0:
+
         new_session.commit()
         new_session.close()
+        self.connections[pool_name]['recording'] = False
         self.logger.warning('Disconnected from ' + str(host) + ':' + str(port))
         sockfd.close()
 
@@ -1015,7 +1026,7 @@ class DatapoolManager:
             return
         # self.logger.debug('tc_send: tc_connections = {}'.format(self.tc_connections))
 
-        new_session = self.session_factory_storage
+        new_session = self.session_factory_storage()
         pool_row = new_session.query(DbTelemetryPool).filter(DbTelemetryPool.pool_name == pool_name).first()
 
         # TC normally just take the information which pool it is from the first Row, But if a Pool is given with only
-- 
GitLab