Python Forum
Very High CPU Usage [Networking, Threads]
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Very High CPU Usage [Networking, Threads]
#1
Hello everyone,

This script below is sort of a peer to peer local network communication system. I tried studying asyncio which i believed to be suitable for my purposes, however, it goes over my head right now. I used threads instead. The problem is my CPU usage really spikes up whenever the script runs like constantly 40% to 50%. I tried adding time.sleeps() where ever i felt the CPU might get clogged but to no avail. The script is as below,

import queue
import struct
import threading
import sys
import socket
import time
from PyQt5 import QtCore, QtWidgets
import mysql.connector


class NonBlockingInputTemplate:
    def __init__(self):
        self.lock = threading.Lock()
        self.__alive = False
        self.input_queue = queue.Queue()
        self.user_input = ''
        self.__input_cache = []

    def __str__(self):
        if self.__alive:
            return "%s %r" % (self.__class__, 'Status : Running')
        else:
            return "%s %r" % (self.__class__, 'Status : Stopped')

    def start(self):
        """Start Input Thread"""
        print('Starting Non Blocking I/O')
        self.__alive = True
        self.start_threads()

    def stop(self):
        """Stop Input Thread"""
        if self.__alive:
            print('Stopping Non Blocking I/O')
            self.__alive = False

    def start_threads(self):
        non_blocking_input_thread = threading.Thread(target=self.non_blocking_input, args=(), name='InputThread')
        read_input_thread = threading.Thread(target=self.read_input, args=(), name='ReadThread')
        self_defined_thread = threading.Thread(target=self.self_defined, args=(), name='SelfThread')
        non_blocking_input_thread.start()
        read_input_thread.start()
        self_defined_thread.start()

    def non_blocking_input(self):
        print('>', end='')
        while True:
            if not self.__alive:
                break
            self.input_queue.put(sys.stdin.read(1))

    def read_input(self):
        while True:
            if not self.__alive:
                break
            if not self.input_queue.empty():
                self.lock.acquire()
                while not self.input_queue.empty():
                    self.user_input = self.user_input + self.input_queue.get()
                self.user_input = self.user_input[:-1]
                if self.user_input:
                    self.__input_cache.append(self.user_input.replace('\n', ''))
                self.lock.release()
                self.user_input = ''
                print('>', end='')

    def self_defined(self):
        pass

    def get(self):
        """Retrieve the first item from the input queue"""
        if self.__input_cache:
            return self.__input_cache.pop(0)

    def cached(self):
        """Return the input queue"""
        return self.__input_cache

    def flush(self):
        """Flush the input queue"""
        self.__input_cache = []


class NonBlockingInput(NonBlockingInputTemplate):
    def __init__(self):
        super().__init__()
        # Additional functionality implemented here as per requirement, left out for now


class CollabTemplate:
    def __init__(self):
        self.receiver = socket.socket()
        self.connector = socket.socket()
        self.broadcast_s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.broadcast_r = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.multi_cast = '224.7.7.7'
        self.port = 25000
        self.m_port = 26000
        self.header_size = 6
        self.peers = []
        self.peers_details = {}
        self.peers_outgoing = {}
        self.lock = threading.Lock()
        self.data_cache = []

    def initialize(self):
        print('Collab Initialized...')
        accept_connections_thread = threading.Thread(target=self.accept_connections, args=(),
                                                     name='AcceptConnectionsThread')
        broadcast_self_thread = threading.Thread(target=self.broadcast_self, args=(), name='BroadcastSelfThread')
        search_peers_thread = threading.Thread(target=self.search_peers, args=(), name='SearchPeersThread')
        ping_routine_thread = threading.Thread(target=self.ping_routine, args=(), name='PingRoutineThread')
        pull_data_thread = threading.Thread(target=self.pull_data, args=(), name='PullDataThread')
        accept_connections_thread.start()
        broadcast_self_thread.start()
        search_peers_thread.start()
        ping_routine_thread.start()
        pull_data_thread.start()

    def accept_connections(self):
        self.receiver.bind((socket.gethostbyname(socket.gethostname()), self.port))
        self.lock.acquire()
        self.peers.append('self')
        self.peers_details['self'] = tuple((socket.gethostbyname(socket.gethostname()), ''))
        self.lock.release()
        self.receiver.listen()
        while True:
            conn, address = self.receiver.accept()
            if address[0] not in [value[0] for key, value in self.peers_details.items()]:
                self.lock.acquire()
                self.peers.append(conn)
                self.peers_details[conn] = address
                self.lock.release()
                try:
                    connector = socket.socket()
                    connector.connect((address[0], self.port))
                    # Experimental code starts here
                    threading.Thread(target=self.peer_thread, args=(connector,),
                                     name='PeerThread-' + str(address[0])).start()
                    # Experimental code ends here
                except ConnectionError as e:
                    print('Error while completing handshake')
                print('Connection received from {}'.format(self.peers_details[conn]))

    def ping_routine(self):
        while True:
            time.sleep(1)
            if self.peers:
                for conn in self.peers:
                    try:
                        if conn != 'self':
                            conn.sendall(bytes([0]))
                    except ConnectionResetError as e:
                        conn.shutdown(socket.SHUT_RDWR)
                        conn.close()
                        print(str(self.peers_details[conn][0]) + ' has disconnected')
                        self.lock.acquire()
                        self.peers.remove(conn)
                        self.peers_details.pop(conn)
                        self.lock.release()

    def peer_thread(self, conn):
        """Communication Protocol
            Header : 1-byte message identification appended with 5 digit message size /x0200005 : 5 char message
                /x00 --> Ping   ;   No response required as if unable to send byte connection will be closed
                /x01 --> Peer Discovery ;   /x01 --> Response to peer discovery
                /x02 --> Communication ;    Appended with the size of the message : Use int(base36encode, 36)
            Content : recv() will be called until the data received does not match the size of the data"""
        while True:
            try:
                peer_data = conn.recv(1)
                if peer_data:
                    if peer_data[0] == 0:
                        pass
                    elif peer_data[0] == 1:
                        pass
                    elif peer_data[0] == 2:
                        while len(peer_data) < self.header_size:
                            peer_data = peer_data + conn.recv(self.header_size - len(peer_data))
                        f_size = int(peer_data[1:self.header_size].decode(), 36)
                        stream_data = bytearray()
                        while len(peer_data) + len(stream_data) - self.header_size < f_size:
                            d_size = f_size - len(peer_data) + self.header_size + len(stream_data)
                            if d_size > 16777216:
                                stream_data.extend(conn.recv(16777216))
                            elif d_size > 4194304:
                                stream_data.extend(conn.recv(4194304))
                            elif d_size > 1048576:
                                stream_data.extend(conn.recv(1048576))
                            elif d_size > 262144:
                                stream_data.extend(conn.recv(262144))
                            elif d_size > 65536:
                                stream_data.extend(conn.recv(65536))
                            elif d_size > 16384:
                                stream_data.extend(conn.recv(16384))
                            elif d_size > 4096:
                                stream_data.extend(conn.recv(4096))
                            else:
                                stream_data.extend(conn.recv(d_size))
                        peer_data = peer_data + stream_data
                        if len(peer_data) - self.header_size > int(peer_data[1:self.header_size].decode(), 36):
                            peer_data = peer_data[0:f_size + self.header_size]
                        self.parser(conn.getpeername()[0], peer_data.decode()[self.header_size:])
            except ConnectionResetError as e:
                break

    def base36encode(self, number):
        alphabet, base36 = ['0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ', '']
        number = abs(number)
        while number:
            number, i = divmod(number, 36)
            base36 = alphabet[i] + base36
        if not base36:
            base36 = 0
        return base36

    def broadcast_self(self):
        self.broadcast_s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, bytes([1]))
        self.broadcast_s.bind((socket.gethostbyname(socket.gethostname()), self.m_port))
        while True:
            time.sleep(5)
            self.broadcast_s.sendto(bytes([1]), (self.multi_cast, self.m_port))
            time.sleep(60)

    def search_peers(self):
        self.broadcast_r.bind(('', self.m_port))
        group = socket.inet_aton(self.multi_cast)
        mreq = struct.pack('4sL', group, socket.INADDR_ANY)
        self.broadcast_r.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
        while True:
            data, address = self.broadcast_r.recvfrom(1)
            if data == bytes([1]) and address[0] not in [value[0] for key, value in self.peers_details.items()]:
                print('Sending connection to', end='')
                print((address[0], str(self.port)))
                connector = socket.socket()
                connector.connect((address[0], self.port))
                threading.Thread(target=self.peer_thread, args=(connector,),
                                 name='PeerThread-' + str(address[0])).start()

    def push(self, data, data_type=None):
        '''data_type (Not Implemented Yet/ Will Implement If Required) :
                b --> Broadcast Message
                p --> Private Message'''
        if len(str(data)) > 0:
            data_size = str(self.base36encode(len(str(data))))
            if len(data_size) <= self.header_size:
                data_size = data_size.rjust(self.header_size - 1, '0')
                self.data_cache.append(bytes([2]) + data_size.encode() + str(data).encode())
            else:
                raise Exception(BufferError)

    def pull_data(self):
        while True:
            time.sleep(0.1)
            if self.data_cache:
                self.msg_broadcast(self.data_cache.pop(0))

    def msg_broadcast(self, data):
        for conn in self.peers:
            if conn != 'self':
                conn.sendall(data)

    def parser(self, peer_details, peer_data):
        # Only prints the string received in the buffer
        # Implement additional functionality in child class
        print(str(peer_details) + ' : ' + peer_data)


class Collab(CollabTemplate):
    def __init__(self):
        super().__init__()
        # Additional functionality implemented here but left our for now


def input_thread():
    while True:
        time.sleep(0.1)
        data = user_input.get()
        if data:
            collab.push(data)


if __name__ == '__main__':
    user_input = NonBlockingInput()
    user_input.start()
    collab = Collab()
    collab.initialize()
    threading.Thread(target=input_thread, args=()).start()
The code is not really all that good but I'm learning. Apart from the original issue any additional advice or issues in the current script are also appreciated.

Thanks
Reply
#2
That's actually not bad in my opinion for CPU usage... you should see what happens with Golang/goroutines hahaha

Sometimes even my Firefox and/or WebContent is at 40% or 50% as well... just saying :)
Reply
#3
Try to put a sleep inside the while True block:

    def read_input(self):
        while True:
            time.sleep(0.1)
            if not self.__alive:
                break
            if not self.input_queue.empty():
                self.lock.acquire()
                while not self.input_queue.empty():
                    self.user_input = self.user_input + self.input_queue.get()
                self.user_input = self.user_input[:-1]
                if self.user_input:
                    self.__input_cache.append(self.user_input.replace('\n', ''))
                self.lock.release()
                self.user_input = ''
                print('>', end='')
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply
#4
Will update after trying tonight

(Jan-10-2020, 08:58 AM)DeaD_EyE Wrote: Try to put a sleep inside the while True block:

    def read_input(self):
        while True:
            time.sleep(0.1)
            if not self.__alive:
                break
            if not self.input_queue.empty():
                self.lock.acquire()
                while not self.input_queue.empty():
                    self.user_input = self.user_input + self.input_queue.get()
                self.user_input = self.user_input[:-1]
                if self.user_input:
                    self.__input_cache.append(self.user_input.replace('\n', ''))
                self.lock.release()
                self.user_input = ''
                print('>', end='')
Reply
#5
I don't like the idea the while True loop with a if queue.Empty() either. Instead, you can do
try:
    value = self.input_queue.get(timeout=0.1)
except queue.Empty:
    continue
else:
    ...
The correct way to wait in threads is not to use time.sleep() statements. Instead you can wait with a timeout on queues or locks or condition objects, or perhaps use selectors that also wait with timeouts.
iMuny likes this post
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020