Jun-06-2023, 02:42 PM
(This post was last modified: Jun-06-2023, 02:44 PM by SecureCoop.)
This is my first time working with asyncio and I can't understand why this task is blocking. Everything is awaited that should be awaited. Please help? Using Python 3.10.6 on Ubuntu 22.04.2 LTS.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
#!/usr/bin/env python import ssl import asyncio from aiomysql import create_pool, DictCursor from config import conf user = conf[ 'MySQL SecureCoop user' ] password = conf[ 'MySQL SecureCoop password' ] host = conf[ 'MySQL host' ] # TODO Move into config db = 'securecoop' cafile = '/etc/nagios/ssl/cacert.pem' async def connect(): ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_ctx.verify_mode = ssl.CERT_REQUIRED ssl_ctx.check_hostname = False ssl_ctx.load_verify_locations(cafile = cafile) # TODO Move into config init_command = """\ SET autocommit = 1; SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED; """ # TODO Move into config autocommit = True loop = asyncio.get_event_loop() # TODO Move into config maxsize = 100 pool = await create_pool(user = user, db = db, host = host, password = password, loop = loop, ssl = ssl_ctx, init_command = init_command, maxsize = maxsize, autocommit = autocommit) return pool async def query(sql, params: tuple = tuple ()): pool = await connect() async with pool.acquire() as conn: cur = await conn.cursor(DictCursor) await cur.execute(sql, params) return cur async def outgoing_worker(): while True : sql = 'SELECT * FROM `websocket_outgoing_queue`' results = await query(sql) print ( repr (results)) for row in await results.fetchall(): serial = row.get( 'serial' , None ) sql = '''DELETE FROM `websocket_outgoing_queue` WHERE `serial` = %s''' params = (serial,) await query(sql, params) values = row.get( 'values' , None ) print ( f 'ws.send(json.dumps({repr(values)}))' ) print ( 'About to await asyncio.sleep(1)' ) await asyncio.sleep( 1 ) async def asyncio_create_queue_tasks(): outgoing_worker_task = asyncio.create_task(outgoing_worker()) await outgoing_worker_task print ( 'Should reach here inside asyncio_create_queue_tasks()' ) loop = asyncio.get_event_loop() print ( 'About to asyncio_create_queue_tasks()' ) asyncio.run(asyncio_create_queue_tasks()) print ( 'Should reach here inside the main routine' ) loop.run_forever() |
Output:(SecureCoop) root@api02:/SecureCoop# ./test_asyncio_db_async.py
/SecureCoop/./test_asyncio_db_async.py:81: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
About to asyncio_create_queue_tasks()
<aiomysql.cursors.DictCursor object at 0x7fe1ecd88e50>
About to await asyncio.sleep(1)
<aiomysql.cursors.DictCursor object at 0x7fe1ecd88f10>
About to await asyncio.sleep(1)
<aiomysql.cursors.DictCursor object at 0x7fe1ecd8a500>
About to await asyncio.sleep(1)
<aiomysql.cursors.DictCursor object at 0x7fe1ecd8b220>
About to await asyncio.sleep(1)
<aiomysql.cursors.DictCursor object at 0x7fe1ecd88fa0>
About to await asyncio.sleep(1)
<aiomysql.cursors.DictCursor object at 0x7fe1ecd89f30>
About to await asyncio.sleep(1)
...