Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Coverting to Epoll
#1
Hello I am running into a issue where select.select is hitting file limits with the number of sockets we're running over. To fix this, a move from select.select to select.epoll seems like the solution to over come the OS limit (well not the OS, it seems to be with python?). Anything over 1024 causes it to crash and give an error ValueError: filedescriptor out of range in select()

Either way, I am trying to rewrite the following code to support epoll instead of select.select and see if that fixes any of the issues we're noticing with a large number of traffic flowing in.

Here is what it looks like now:
class TCPServer:
    request_queue_size = 20

    def __init__(self, address):
        self.address = Address.wrap(address)
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False
        self.socket = socket.socket(self.address.family, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.address())
        self.address = Address.wrap(self.socket.getsockname())
        self.socket.listen(self.request_queue_size)
        self.handler_counter = Counter()

    def connection_thread(self, connection, client_address):
        with self.handler_counter:
            client_address = Address(client_address)
            try:
                self.handle_client_connection(connection, client_address)
            except:
                self.handle_error(connection, client_address)
            finally:
                close_socket(connection)

    def serve_forever(self, poll_interval=0.1):
        self.__is_shut_down.clear()
        try:
            while not self.__shutdown_request:
                try:
                    r, w_, e_ = select.select(
                        [self.socket], [], [], poll_interval)
                except select.error as ex:  # pragma: no cover
                    if ex[0] == EINTR:
                        continue
                    else:
                        raise
                if self.socket in r:
                    connection, client_address = self.socket.accept()
                    t = basethread.BaseThread(
                        "TCPConnectionHandler (%s: %s:%s -> %s:%s)" % (
                            self.__class__.__name__,
                            client_address[0],
                            client_address[1],
                            self.address.host,
                            self.address.port
                        ),
                        target=self.connection_thread,
                        args=(connection, client_address),
                    )
                    t.setDaemon(1)
                    try:
                        t.start()
                    except threading.ThreadError:
                        self.handle_error(connection, Address(client_address))
                        connection.close()
        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        self.__shutdown_request = True
        self.__is_shut_down.wait()
        self.socket.close()
        self.handle_shutdown()

    def handle_error(self, connection_, client_address, fp=sys.stderr):
        """
            Called when handle_client_connection raises an exception.
        """
        # If a thread has persisted after interpreter exit, the module might be
        # none.
        if traceback:
            exc = str(traceback.format_exc())
            print(u'-' * 40, file=fp)
            print(
                u"Error in processing of request from %s" % repr(client_address), file=fp)
            print(exc, file=fp)
            print(u'-' * 40, file=fp)

    def handle_client_connection(self, conn, client_address):  # pragma: no cover
        """
            Called after client connection.
        """
        raise NotImplementedError

    def handle_shutdown(self):
        """
            Called after server shutdown.
        """

    def wait_for_silence(self, timeout=5):
        start = time.time()
        while 1:
            if time.time() - start >= timeout:
                raise exceptions.Timeout(
                    "%s service threads still alive" %
                    self.handler_counter.count
                )
            if self.handler_counter.count == 0:
                return
Here is what I have so far - but obs. not the end result I am looking for:
class TCPServer:
    request_queue_size = 20

    def __init__(self, address):
        self.address = Address.wrap(address)
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False
        self.socket = socket.socket(self.address.family, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.socket.bind(self.address())
        self.address = Address.wrap(self.socket.getsockname())
        self.socket.listen(self.request_queue_size)
        self.handler_counter = Counter()
        self.epoll = select.epoll()
        self.epoll.register(self.socket.fileno(), select.EPOLLIN)

    def connection_thread(self, connection, client_address):
        with self.handler_counter:
            client_address = Address(client_address)
            try:
                self.handle_client_connection(connection, client_address)
            except:
                self.handle_error(connection, client_address)
            finally:
                close_socket(connection)

    def serve_forever(self, poll_interval=0.1):
        self.__is_shut_down.clear()
        try:
            connections = {}; requests = {}; responses = {}
            while not self.__shutdown_request:
                events = self.epoll.poll(1)
                for fileno, event in events:
                    if fileno == self.socket.fileno():
                        connection, address = self.socket.accept()
                        connection.setblocking(0)
                        self.epoll.register(connection.fileno(), select.EPOLLIN)
                        connections[connection.fileno()] = connection
                    elif event & select.EPOLLIN:
                        requests[fileno] += connections[fileno].recv(1024)
                        if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
                           epoll.modify(fileno, select.EPOLLOUT)
                           connections[fileno].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 1)
                    elif event & select.EPOLLOUT:
                        byteswritten = connections[fileno].send(responses[fileno])
                        responses[fileno] = responses[fileno][byteswritten:]
                        if len(responses[fileno]) == 0:
                           connections[fileno].setsockopt(socket.IPPROTO_TCP, socket.TCP_CORK, 0)
                           epoll.modify(fileno, 0)
                           connections[fileno].shutdown(socket.SHUT_RDWR)
                    elif event & select.EPOLLHUP:
                        epoll.unregister(fileno)
                        connections[fileno].close()
                        del connections[fileno]

        finally:
            self.__shutdown_request = False
            self.__is_shut_down.set()

    def shutdown(self):
        self.__shutdown_request = True
        self.__is_shut_down.wait()
        self.epoll.unregister(self.socket.fileno())
        self.socket.close()
        self.handle_shutdown()

    def handle_error(self, connection_, client_address, fp=sys.stderr):
        """
            Called when handle_client_connection raises an exception.
        """
        # If a thread has persisted after interpreter exit, the module might be
        # none.
        if traceback:
            exc = str(traceback.format_exc())
            print(u'-' * 40, file=fp)
            print(
                u"Error in processing of request from %s" % repr(client_address), file=fp)
            print(exc, file=fp)
            print(u'-' * 40, file=fp)

    def handle_client_connection(self, conn, client_address):  # pragma: no cover
        """
            Called after client connection.
        """
        raise NotImplementedError

    def handle_shutdown(self):
        """
            Called after server shutdown.
        """

    def wait_for_silence(self, timeout=5):
        start = time.time()
        while 1:
            if time.time() - start >= timeout:
                raise exceptions.Timeout(
                    "%s service threads still alive" %
                    self.handler_counter.count
                )
            if self.handler_counter.count == 0:
                return
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020