Python Forum
asyncio stops responding to requests
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
asyncio stops responding to requests
#1
Hi All,

I have a interesting problem with a server script I am working on.

The Python 3.6 server script takes requests from a client program written in C and sends responses back.
The C code sends the requests in two parts, first the length of the message is sent, then the message itself is sent.

The driver of the client code is a 3rd program written in Java. The Java code initiates the requests by sending a block of requests,
initially in blocks of 60.

The key snippet of the Python server script is:

import asyncio
import socket
.....

addr = (localhost, 50000)

loop = asyncio.get_event_loop()

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(False)
sock.bind(addr)
sock.listen(N)

async def sserver():
     while True:
          conn, addr = await loop.sock_accept(sock)
          sr = SRequests(pool, loop, log)
          loop.create_task(sr.ahandle(conn))

loop.create_task(sserver())

try:
    loop.run_forever()
except KeyboardInterrupt:
    sserver.close()
    loop.run_until_complete(sserver.wait_close())
    loop.close()
    sock.shutdown(socket.SHUT_RDWR)
    sock.close()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()
    sock.shutdown(socket.SHUT_RDWR)
    sock.close()
where the method ahandle in the custom class SRequests does all the work of decoding the messages, calling other methods
in other custom classes to generate the responses, and then sending the encoded replies back to the C code.

One of the key tasks of the Python server script is to execute small SQL INSERT and SELECT against tables in a PostgreSQL DB.
The DB is configured to handle 1024 connections at a time.

The overall purpose of the Java/C/Python client/server code is to download files from a remote site to a local directory.

Yet, testing with N = 10, 20 and 100,000 in the listen statement, yields just between 4 and 6 downloaded files, not the expected 60 in the initial block
of requests.

What I see is the block of 60 requests, initiated on the Java side, results in 60 DB connections being made on the Python side. Then the Python server script begins to process the requests, but stops after just the initial 60 asynchronous requests are received. Only between 4 and 6 files are ever downloaded.

The SQL used to monitor the DB connections is

select datid, pid, usename, state, query, backend_type from pg_stat_activity order by pid;

The result shows 60 connections, and between 4 and 6 w/state = COMMIT.

Since the DB is configured to handle many more connections than it is getting, why does the processing on the Python side simply stop.
Tailing the log file for the Python server script shows that log entries simply stop being logged, yet the Java code continues to try and send more requests.

Any tips, ideas, or suggestions would be greatly appreciated.

Thanks!

--Ed
Reply
#2
I found the solution to my problem. I needed to add a callback function for the two tasks, and return values for each Task.

Otherwise, when an exception is raised in a Task, it will not be seen until the Task goes out-of-scope, which in my case since
the Python server script is running in a run_forever loop, will never go out-of-scope until the server script receives a SIGINT.

In case anyone else has a similar problem, here's the update code snippet:

import asyncio
import socket
.....
 
addr = (localhost, 50000)
 
loop = asyncio.get_event_loop()
 
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(False)
sock.bind(addr)
sock.listen(N)
 
def _handle_ahandle_throws(task: asyncio.Task):
      try:
        task.result()
      except asyncio.CancelledError:
        pass
      except Exception:
        logging.exception('Exception raised in ahandle...by task = %r', task)

async def sserver():
     while True:
          conn, addr = await loop.sock_accept(sock)
          sr = SRequests(pool, loop, log)
          sums_task = loop.create_task(sr.ahandle(conn))
          sums_task.add_done_callback(_handle_ahandle_throws)
 
loop.create_task(sserver())
 
try:
    loop.run_forever()
except KeyboardInterrupt:
    sserver.close()
    loop.run_until_complete(sserver.wait_close())
    loop.close()
    sock.shutdown(socket.SHUT_RDWR)
    sock.close()
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()
    sock.shutdown(socket.SHUT_RDWR)
    sock.close()

...

    class SRequests(asyncio.Protocol, SRequest)
         def __init__(self, pool, loop, log):
              self.pool = pool
              self.loop = loop
              self.log = log
              self.setup()
              return None

         def _handle_db_task_throws(task: asyncio.Task):
              try:
                  task.result()
              except asyncio.CancelledError:
                  pass
              except Exception:
                 logging.exception('Exception raised by db_task = %r', task)

          async def getDBConn(self):
                ...
                dbConn = DBConnection(...)
                ...
                return dbConn

          def setup(self):
                ...
                db_task = self.loop.create_task(self.getDBConn())
                db_task.add_done_callback(_handle_db_task_throws)
                ...
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Configuring requests module to use secondary IP on server for outbound requests mohit 1 6,567 Oct-24-2016, 05:21 PM
Last Post: nilamo

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020