Python Forum
[SOLVED] Why is this asyncio task blocking?
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
[SOLVED] Why is this asyncio task blocking?
#1
This is my first time working with asyncio and I can't understand why this task is blocking. Everything is awaited that should be awaited. Please help? Using Python 3.10.6 on Ubuntu 22.04.2 LTS.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#!/usr/bin/env python
import ssl
import asyncio
from aiomysql import create_pool, DictCursor
from config import conf
 
 
user = conf['MySQL SecureCoop user']
password = conf['MySQL SecureCoop password']
host = conf['MySQL host']
# TODO Move into config
db = 'securecoop'
cafile = '/etc/nagios/ssl/cacert.pem'
 
 
async def connect():
    ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    ssl_ctx.verify_mode = ssl.CERT_REQUIRED
    ssl_ctx.check_hostname = False
    ssl_ctx.load_verify_locations(cafile=cafile)
 
    # TODO Move into config
    init_command = """\
        SET autocommit = 1;
        SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
        """
    # TODO Move into config
    autocommit = True
 
    loop = asyncio.get_event_loop()
    # TODO Move into config
    maxsize = 100
 
    pool = await create_pool(user=user,
                             db=db,
                             host=host,
                             password=password,
                             loop=loop,
                             ssl=ssl_ctx,
                             init_command=init_command,
                             maxsize=maxsize,
                             autocommit=autocommit)
    return pool
 
 
async def query(sql, params: tuple = tuple()):
    pool = await connect()
    async with pool.acquire() as conn:
        cur = await conn.cursor(DictCursor)
        await cur.execute(sql, params)
        return cur
 
 
async def outgoing_worker():
    while True:
        sql = 'SELECT * FROM `websocket_outgoing_queue`'
        results = await query(sql)
        print(repr(results))
 
        for row in await results.fetchall():
            serial = row.get('serial', None)
 
            sql = '''DELETE FROM `websocket_outgoing_queue`
            WHERE `serial` = %s'''
            params = (serial,)
            await query(sql, params)
 
            values = row.get('values', None)
 
            print(f'ws.send(json.dumps({repr(values)}))')
        print('About to await asyncio.sleep(1)')
        await asyncio.sleep(1)
 
 
async def asyncio_create_queue_tasks():
    outgoing_worker_task = asyncio.create_task(outgoing_worker())
    await outgoing_worker_task
    print('Should reach here inside asyncio_create_queue_tasks()')
 
 
loop = asyncio.get_event_loop()
print('About to asyncio_create_queue_tasks()')
asyncio.run(asyncio_create_queue_tasks())
print('Should reach here inside the main routine')
loop.run_forever()
Output:
(SecureCoop) root@api02:/SecureCoop# ./test_asyncio_db_async.py /SecureCoop/./test_asyncio_db_async.py:81: DeprecationWarning: There is no current event loop loop = asyncio.get_event_loop() About to asyncio_create_queue_tasks() <aiomysql.cursors.DictCursor object at 0x7fe1ecd88e50> About to await asyncio.sleep(1) <aiomysql.cursors.DictCursor object at 0x7fe1ecd88f10> About to await asyncio.sleep(1) <aiomysql.cursors.DictCursor object at 0x7fe1ecd8a500> About to await asyncio.sleep(1) <aiomysql.cursors.DictCursor object at 0x7fe1ecd8b220> About to await asyncio.sleep(1) <aiomysql.cursors.DictCursor object at 0x7fe1ecd88fa0> About to await asyncio.sleep(1) <aiomysql.cursors.DictCursor object at 0x7fe1ecd89f30> About to await asyncio.sleep(1) ...
Reply
#2
Found the bug. Need to use loop.create_task(). Modified code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/usr/bin/env python
import ssl
import asyncio
from aiomysql import create_pool, DictCursor
from config import conf
 
 
user = conf['MySQL SecureCoop user']
password = conf['MySQL SecureCoop password']
host = conf['MySQL host']
# TODO Move into config
db = 'securecoop'
cafile = '/etc/nagios/ssl/cacert.pem'
 
 
async def connect():
    ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    ssl_ctx.verify_mode = ssl.CERT_REQUIRED
    ssl_ctx.check_hostname = False
    ssl_ctx.load_verify_locations(cafile=cafile)
 
    # TODO Move into config
    init_command = """\
        SET autocommit = 1;
        SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED;
        """
    # TODO Move into config
    autocommit = True
 
    loop = asyncio.get_event_loop()
    # TODO Move into config
    maxsize = 100
 
    pool = await create_pool(user=user,
                             db=db,
                             host=host,
                             password=password,
                             loop=loop,
                             ssl=ssl_ctx,
                             init_command=init_command,
                             maxsize=maxsize,
                             autocommit=autocommit)
    return pool
 
 
async def query(sql, params: tuple = tuple()):
    pool = await connect()
    async with pool.acquire() as conn:
        cur = await conn.cursor(DictCursor)
        await cur.execute(sql, params)
        return cur
 
 
async def outgoing_worker():
    while True:
        sql = 'SELECT * FROM `websocket_outgoing_queue`'
        results = await query(sql)
        print(repr(results))
 
        for row in await results.fetchall():
            serial = row.get('serial', None)
 
            sql = '''DELETE FROM `websocket_outgoing_queue`
            WHERE `serial` = %s'''
            params = (serial,)
            await query(sql, params)
 
            values = row.get('values', None)
 
            print(f'ws.send(json.dumps({repr(values)}))')
        print('About to await asyncio.sleep(1)')
        await asyncio.sleep(1)
 
 
loop = asyncio.get_event_loop()
loop.create_task(outgoing_worker())
print('Should reach here inside the main routine')
loop.run_forever()
I can see the output below says "Should reach here inside the main routine" as I want it to.
Output:
(SecureCoop) root@api02:/SecureCoop# ./test_asyncio_db_async.py /SecureCoop/./test_asyncio_db_async.py:75: DeprecationWarning: There is no current event loop loop = asyncio.get_event_loop() Should reach here inside the main routine <aiomysql.cursors.DictCursor object at 0x7f727255cb20> About to await asyncio.sleep(1) <aiomysql.cursors.DictCursor object at 0x7f727255cb80> About to await asyncio.sleep(1) <aiomysql.cursors.DictCursor object at 0x7f727255e140> About to await asyncio.sleep(1) ...
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Asyncio subprocess blocking with python workload only lloydbond 1 643 Dec-20-2024, 05:14 PM
Last Post: lloydbond
  Undestanding asyncio (Solved) ebolisa 2 1,287 Mar-16-2024, 03:40 PM
Last Post: ebolisa
  Non-blocking real-time plotting slow_rider 5 7,826 Jan-07-2023, 09:47 PM
Last Post: woooee
  Make code non-blocking? Extra 0 1,971 Dec-03-2022, 10:07 PM
Last Post: Extra
  count certain task in task manager[solved] kucingkembar 2 1,857 Aug-29-2022, 05:57 PM
Last Post: kucingkembar
  Schedule a task and render/ use the result of the task in any given time klllmmm 2 2,760 May-04-2021, 10:17 AM
Last Post: klllmmm
  How to create a task/import a task(task scheduler) using python Tyrel 7 5,215 Feb-11-2021, 11:45 AM
Last Post: Tyrel
  Request blocking in Chrome Incognito mode pyseeker 0 2,856 Nov-04-2020, 08:51 PM
Last Post: pyseeker
  Trying to understand blocking in both multitasking and asyncio penright 7 6,081 Jun-29-2018, 04:22 PM
Last Post: penright

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020