Python Forum

Full Version: drawing a table with the status of tasks in each thread
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
Hi all!

I have no formal training but I get scripts working by searching the net and reading.

I am attempting to run a threaded script using concurrent.futures.ThreadPoolExecutor and have each task in each thread populate a "live table". So when thread_0 is running task_0, I will have a collumn "task 0" with the state running in the row "task_0".

I am trying to use Rich for the table layout and trying to use its "Live" feature to refresh the table but it seems that any data I add to the table is not taken into account.

Has anyone done something like that and how would you go about it? I get that this is not a small request so pointers to doc or articles is also great.

Thanks for any help/advice you can provide.

threads task0 task1
--------------------------------
thread_0 done running
thread_1 running
thread_2 done done
Maybe this helps as a start.
Additionally, you should look here: https://rich.readthedocs.io/en/stable/live.html


import time
from collections.abc import Generator
from concurrent.futures import ThreadPoolExecutor
from operator import itemgetter
from queue import Queue

from rich import get_console
from rich.live import Live
from rich.table import Table

console = get_console()
THE_END = object()


def _counter() -> Generator[int, None, None]:
    value = 0
    while True:
        yield value
        value += 1


count = _counter()


def generate_table(data: dict[int, str] | None) -> Table:
    data = data or {}
    table = Table()
    table.add_column("Worker ID")
    table.add_column("Status text")

    for worker_id, msg in sorted(data.items(), key=itemgetter(0)):
        table.add_row(str(worker_id), msg)

    return table


def show_status(queue: Queue):
    workers: dict[int, str] = {}

    with Live() as live:
        while data := queue.get():
            worker_id, msg = data

            if worker_id is THE_END:
                queue.task_done()
                return

            workers[worker_id] = msg
            queue.task_done()
            
            live.update(generate_table(workers))


def worker(name: str, delay: int | float, queue: Queue) -> int:
    worker_id = next(count)

    queue.put((worker_id, f"Starting {name}"))
    time.sleep(1)

    queue.put((worker_id, f"Waiting {delay:.2f} seconds."))
    time.sleep(delay)

    queue.put((worker_id, f"{name} done"))

    return int(delay)


def runner():
    status_queue = Queue()

    with ThreadPoolExecutor(3) as pool:
        pool.submit(show_status, status_queue)

        futures = []
        for i in range(1, 6):
            futures.append(pool.submit(worker, f"Worker {i}", i, status_queue))

        while True:
            if all(fut.done() for fut in futures):
                break
            time.sleep(1)

        status_queue.put((THE_END, ""))


if __name__ == "__main__":
    runner()
Thanks DeaD_EyE,

it's impressive! I'll take the time to understand what each library you are using does but it works perfectly, thank yo so much.
Use a global dictionary to store status, and update it from each thread. Re-render the table in the main thread on a Timer to pick up changes.
Create a Queue and have each thread put status updates into it. Poll the queue in the main thread and update the table.
Use a thread-safe collections like concurrent.futures.Future to get status results and update the table.
Look into using a reactive framework like RxPy to handle concurrency and updating the table reactively.
The key things to watch out for are:

Updates to the Rich table need to happen on the main thread.
You'll need thread-safe data structures for sharing state between threads.
Avoid updating the table too rapidly - use small timers/delays.
I'd recommend looking into some examples using RxPy or reactive frameworks as they provide helpful abstractions for this kind of concurrent, live updating UI.