Python Forum
How to process tasks as they complete when using TaskGroup?
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
How to process tasks as they complete when using TaskGroup?
#1
I understand the arguments for using the newer TaskGroup in place of older mechanisms based on create_task().

However, TaskGroup exits after all tasks are done. What if you want to start processing the results as soon as the first task finishes, like you can with asyncio.as_completed() or asyncio.wait( ..., return_when=FIRST_COMPLETED )?

If TaskGroup offers no way to do that, then isn't using it a trade-off rather than a strict upgrade?
Reply
#2
It's possible to replicate similar behavior.
Example:

import asyncio


class TaskException(Exception):
    pass


async def worker(n):
    try:
        await asyncio.sleep(n)
    except asyncio.CancelledError:
        print(f"Task {n} has been cancelled")
        # Task 10 has been cancelled

    if n == 9:
        raise TaskException(f"Exception in Task {n}")

    return n


async def main():
    tasks_working = set()
    tasks_done = set()

    def task_done(task):
        tasks_working.discard(task)
        tasks_done.add(task)
        

    async with asyncio.TaskGroup() as tg:
        for n in range(1,11):
            task = tg.create_task(worker(n))
            tasks_working.add(task)
            task.add_done_callback(task_done)

        while tasks_working or tasks_done:
            print(".", end="", flush=True)
            
            if tasks_done:
                print()
                
            for task in tasks_done.copy():
                print(f"Task {task.result()} is done")
                tasks_done.discard(task)
                
            await asyncio.sleep(1)
            
        print("All tasks are done")
    print("Leaving TGs context")

try:
    asyncio.run(main())
except *TaskException as e:
    for exc in e.exceptions:
        print(exc)
        # Exception in Task 9
Output:
Output:
... Task 2 is done Task 1 is done . Task 3 is done . Task 4 is done . Task 5 is done . Task 6 is done . Task 7 is done . Task 8 is done Task 10 has been cancelled Exception in Task 9
Hm, print("All tasks are done") and print("Leaving TGs context") are not called.
I thought if there is an Exception inside the TaskGroup, all Tasks are cancelled.
I think it's not called, because the Exceptions happens in the eventloop somewhere and ends it before the eventloop is able to execute the rest.
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply
#3
I didn't quite follow of all that, but the gist of it seems to be "use task.add_done_callback() inside the TG context to implement timely task result processing without waiting for TG context to exit".

Thanks, I'll give that pattern a try.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Managing recursive tasks in celery. vamix 1 794 Sep-04-2024, 06:36 PM
Last Post: deanhystad
  drawing a table with the status of tasks in each thread pyfoo 3 1,394 Mar-01-2024, 09:29 AM
Last Post: nerdyaks
  Python multiprocessing Pool apply async wait for process to complete sunny9495 6 11,436 Apr-02-2022, 06:31 AM
Last Post: sunny9495
  How to script repetitive tasks in dynaform using python BenneGrus 0 1,950 Dec-22-2021, 08:36 AM
Last Post: BenneGrus
  asyncio: executing tasks + httpserver freebsdd 2 3,564 Aug-29-2020, 09:50 PM
Last Post: freebsdd
  How to sharing object between multiple process from main process using Pipe Subrata 1 4,440 Sep-03-2019, 09:49 PM
Last Post: woooee
  How to add asynchronous tasks as they are needed? AlekseyPython 2 5,505 Jan-11-2019, 02:58 AM
Last Post: AlekseyPython
  How I can limit quantity of parallel executable tasks in asyncio? AlekseyPython 1 3,025 Oct-24-2018, 10:22 AM
Last Post: AlekseyPython
  run two tasks concurrently tony1812 1 3,178 Jul-24-2017, 05:43 PM
Last Post: Larz60+
  Tasks for Python Lamon112 2 52,063 Jan-13-2017, 03:32 AM
Last Post: metulburr

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020