Python Forum
Asyncio subprocess blocking with python workload only
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Asyncio subprocess blocking with python workload only
#1
issue: if you start a subprocess that has a pipe to stdout for the purpose of concurrently processing output from stdout using asyncio everything works normal except when that subprocess is a python script.
Expected: Start a subprocess of a python script concurrently receive stdout output one line at a time for the duration of the sub process.
Actual: The subprocess starts the python script, stdout output builds up (for about 637 lines) then the lines are received by the main process, then the subprocess continues to build up more lines of output.


In the sample python script. the function run will run three different scripts, in order, bash, python and nodejs. Bash and nodejs work as expected. 1 line of stdout output in the subprocess is 1 line returned through the pipe to be handled by the main process. The Python script should do the same. Instead, the output is blocked or queued or buffered in some way until either the python script finishes or there is around 637 lines in the stdout pipe. At which point all the lines in the pipe are then handled by the main process at the same time.
The main process writes to stream.log. As output is returned from the subprocess, each line is written to stream.log. Each subprocess will write to stdout and write to stream2.log. A subprocess writes to stdout so that the main process can receive the lines of output and write them to stream.log. Each subprocess has a delay 500ms, this is so you can easily see when the lines of stdout are being piped to the main process. First goes bash, then python, and finally nodejs. You will see, the bash output getting written to both logs at nearly the same time with the same -ish delay between writes. Next, you see the python script writing to stream2.log and nothing being written to stream.log. When the python script finishes, then you see stream.log fill with all the lines that were buffered or blocked. Last, nodejs starts and we are back to seeing stream.log and stream2.log being written to at the same frequency. Bash and nodejs work as expected.

Setup
Save the contents of the script below to a file my_script.py
touch stream.log
touch stream2.log
Running
terminal window 1
tail -f stream.log
terminal window 2
tail -f stream2.log
in a third terminal
python my_script.py
import asyncio
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(filename="stream.log", encoding="utf-8", level=logging.DEBUG)

js_code = """
const fs = require('fs');
const logFile = fs.createWriteStream('stream2.log', { flags: 'a' }); // 'a' for append, 'w' for overwrite

(async function() {
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))
let i = 0;
while( i < 10 ){
  let msg = `counter: ${i}`;
  console.log(msg);
  logFile.write(`${msg}\n`);
  i += 1;
  await sleep(500)
}
console.log("JS completed")
})()
"""

bash_code = 'for i in $(seq 1 10); do echo "Hi, $i" >> stream2.log; echo "Hi, $i"; sleep .5; done'

py_code = """
import time
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(filename="stream2.log", encoding="utf-8", level=logging.DEBUG)

i = 0
while i < 10:
    msg = f"counter: {i}"
    print(msg)
    logger.debug(msg)
    i += 1
    time.sleep(0.5)
print("completed")
logger.debug("PY completed")
"""


async def run(command: list[str]):
    proc = await asyncio.create_subprocess_exec(
        *command,
        stdout=asyncio.subprocess.PIPE,
    )

    while True:
        data = await proc.stdout.readline()
        if not data:
            logger.debug("end of stream")
            break
        line = data.decode().rstrip()
        logger.debug(line)

    await proc.wait()
    return proc.returncode


asyncio.run(run(["bash", "-c", bash_code]))
asyncio.run(run(["python", "-c", py_code]))
asyncio.run(run(["node", "-e", js_code]))
If anybody can get me steered in the right direction on this it would be much appreciated.
Reply
#2
set
PYTHONUNBUFFERED="1"
proc = await asyncio.create_subprocess_exec(
        *command,
        stdout=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
        env=dict(os.environ, PYTHONUNBUFFERED="1"),
    )
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  [SOLVED] Why is this asyncio task blocking? SecureCoop 1 1,654 Jun-06-2023, 02:43 PM
Last Post: SecureCoop
  Non-blocking real-time plotting slow_rider 5 8,006 Jan-07-2023, 09:47 PM
Last Post: woooee
  Make code non-blocking? Extra 0 1,987 Dec-03-2022, 10:07 PM
Last Post: Extra
  Request blocking in Chrome Incognito mode pyseeker 0 2,865 Nov-04-2020, 08:51 PM
Last Post: pyseeker
  python loop in subprocess vinothkumargomu 6 4,741 Jul-06-2020, 12:02 PM
Last Post: vinothkumargomu
  How to desing an app for tracking workload Cuz 4 3,648 Jan-08-2019, 10:45 AM
Last Post: Cuz
  Python Error in subprocess.py roydianton90 1 6,267 Dec-14-2018, 11:26 AM
Last Post: jeanMichelBain
  OS command via python subprocess module alinaveed786 21 15,102 Oct-23-2018, 05:40 AM
Last Post: alinaveed786
  subprocess in thread python to check reachability anna 3 4,481 Sep-05-2018, 04:01 PM
Last Post: woooee
  Trying to understand blocking in both multitasking and asyncio penright 7 6,102 Jun-29-2018, 04:22 PM
Last Post: penright

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020