diff --git a/Ccs/ccs_function_lib.py b/Ccs/ccs_function_lib.py index 27c37669e95e2469d878b809c6d8c322a4e509f8..8b3976f6fa1386e11fc2e10541a8960e8e003d5d 100644 --- a/Ccs/ccs_function_lib.py +++ b/Ccs/ccs_function_lib.py @@ -1705,7 +1705,8 @@ def connect(pool_name, host, port, protocol='PUS', is_server=False, timeout=10, 'override_with_options': '1'})}) -def connect_tc(pool_name, host, port, protocol='PUS', drop_rx=True, timeout=10, is_server=False, options=''): +def connect_tc(pool_name, host, port, protocol='PUS', drop_rx=True, timeout=10, is_server=False, use_socket=None, + options=''): """ Accessibility function for 'connect_tc' in pus_datapool :param pool_name: @@ -1715,6 +1716,7 @@ def connect_tc(pool_name, host, port, protocol='PUS', drop_rx=True, timeout=10, :param protocol: :param timeout: :param is_server: + :param use_socket: :param options: :return: """ @@ -1727,7 +1729,8 @@ def connect_tc(pool_name, host, port, protocol='PUS', drop_rx=True, timeout=10, 'is_server': is_server, 'timeout': timeout, 'options': options, - 'drop_rx': drop_rx}) + 'drop_rx': drop_rx, + 'use_socket': use_socket}) pmgr.Functions('connect_tc', pool_name, host, port, {'kwargs': dbus.Dictionary({'options': kwarguments, 'override_with_options': '1'})}) diff --git a/Ccs/pus_datapool.py b/Ccs/pus_datapool.py index ea272c80397a77d922f6267a7799df4ca15cdd49..a2aa5aed91c6508494d1602ae35c26bd1563e8f8 100644 --- a/Ccs/pus_datapool.py +++ b/Ccs/pus_datapool.py @@ -458,7 +458,7 @@ class DatapoolManager: return def connect_tc(self, pool_name, host, port, protocol='PUS', drop_rx=True, timeout=10, is_server=False, options='', - override_with_options=False): + override_with_options=False, use_socket=None): # override variables that are set in the options string if bool(override_with_options): @@ -468,32 +468,46 @@ class DatapoolManager: drop_rx = override.get('drop_rx', drop_rx) timeout = override.get('timeout', timeout) is_server = override.get('is_server', is_server) + use_socket = override.get('use_socket', use_socket) # options = override.get('options', options) - if is_server: - socketserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - socketserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - socketserver.settimeout(timeout) - socketserver.bind((host, port)) - socketserver.listen() - try: - sockfd, addr = socketserver.accept() - except socket.timeout: - socketserver.close() - self.logger.error("Connection timeout, no client has connected to {}:{}".format(host, port)) - return + if use_socket is not None: + if isinstance(use_socket, socket.socket): + sockfd = use_socket + elif isinstance(use_socket, str): + try: + sockfd = self.connections[use_socket]['socket'] + except KeyError: + self.logger.error('No existing socket found for "{}"'.format(use_socket)) + raise KeyError('No existing socket found for "{}"'.format(use_socket)) + else: + self.logger.error('use_socket must be of type str or socket') + raise TypeError('use_socket must be of type str or socket') else: - sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sockfd.settimeout(timeout) + if is_server: + socketserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + socketserver.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + socketserver.settimeout(timeout) + socketserver.bind((host, port)) + socketserver.listen() + try: + sockfd, addr = socketserver.accept() + except socket.timeout: + socketserver.close() + self.logger.error("Connection timeout, no client has connected to {}:{}".format(host, port)) + return + else: + sockfd = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sockfd.settimeout(timeout) - if pool_name in self.tc_connections: - self.logger.warning('Pool "{}" already has TC connection to {}!'.format(pool_name, self.tc_connections[pool_name]['socket'].getpeername())) - return - try: - sockfd.connect((host, port)) - except ConnectionRefusedError: - self.logger.error("Connection to {}:{} refused".format(host, port)) - return + if pool_name in self.tc_connections: + self.logger.warning('Pool "{}" already has TC connection to {}!'.format(pool_name, self.tc_connections[pool_name]['socket'].getpeername())) + return + try: + sockfd.connect((host, port)) + except ConnectionRefusedError: + self.logger.error("Connection to {}:{} refused".format(host, port)) + return self.tc_sock = sockfd self.tc_name = pool_name @@ -931,7 +945,7 @@ class DatapoolManager: buf = ack # PUS, just read packets and discard them - else: + elif protocol.lower() == 'pus': pkt_size_stream = sockfd.recv(6) while len(pkt_size_stream) < 6: data = sockfd.recv(1) @@ -947,6 +961,10 @@ class DatapoolManager: break buf += d + # any other protocol, just read from socket and discard + else: + buf = sockfd.recv(1024) + with self.lock: self.databuflen += len(buf) except socket.timeout: