Hi all!
I have no formal training but I get scripts working by searching the net and reading.
I am attempting to run a threaded script using concurrent.futures.ThreadPoolExecutor
and have each task in each thread populate a "live table". So when thread_0 is running task_0, I will have a collumn "task 0" with the state running in the row "task_0".
I am trying to use Rich for the table layout and trying to use its "Live" feature to refresh the table but it seems that any data I add to the table is not taken into account.
Has anyone done something like that and how would you go about it? I get that this is not a small request so pointers to doc or articles is also great.
Thanks for any help/advice you can provide.
threads task0 task1
--------------------------------
thread_0 done running
thread_1 running
thread_2 done done
Maybe this helps as a start.
Additionally, you should look here:
https://rich.readthedocs.io/en/stable/live.html
import time
from collections.abc import Generator
from concurrent.futures import ThreadPoolExecutor
from operator import itemgetter
from queue import Queue
from rich import get_console
from rich.live import Live
from rich.table import Table
console = get_console()
THE_END = object()
def _counter() -> Generator[int, None, None]:
value = 0
while True:
yield value
value += 1
count = _counter()
def generate_table(data: dict[int, str] | None) -> Table:
data = data or {}
table = Table()
table.add_column("Worker ID")
table.add_column("Status text")
for worker_id, msg in sorted(data.items(), key=itemgetter(0)):
table.add_row(str(worker_id), msg)
return table
def show_status(queue: Queue):
workers: dict[int, str] = {}
with Live() as live:
while data := queue.get():
worker_id, msg = data
if worker_id is THE_END:
queue.task_done()
return
workers[worker_id] = msg
queue.task_done()
live.update(generate_table(workers))
def worker(name: str, delay: int | float, queue: Queue) -> int:
worker_id = next(count)
queue.put((worker_id, f"Starting {name}"))
time.sleep(1)
queue.put((worker_id, f"Waiting {delay:.2f} seconds."))
time.sleep(delay)
queue.put((worker_id, f"{name} done"))
return int(delay)
def runner():
status_queue = Queue()
with ThreadPoolExecutor(3) as pool:
pool.submit(show_status, status_queue)
futures = []
for i in range(1, 6):
futures.append(pool.submit(worker, f"Worker {i}", i, status_queue))
while True:
if all(fut.done() for fut in futures):
break
time.sleep(1)
status_queue.put((THE_END, ""))
if __name__ == "__main__":
runner()
Thanks DeaD_EyE,
it's impressive! I'll take the time to understand what each library you are using does but it works perfectly, thank yo so much.
Use a global dictionary to store status, and update it from each thread. Re-render the table in the main thread on a Timer to pick up changes.
Create a Queue and have each thread put status updates into it. Poll the queue in the main thread and update the table.
Use a thread-safe collections like concurrent.futures.Future to get status results and update the table.
Look into using a reactive framework like RxPy to handle concurrency and updating the table reactively.
The key things to watch out for are:
Updates to the Rich table need to happen on the main thread.
You'll need thread-safe data structures for sharing state between threads.
Avoid updating the table too rapidly - use small timers/delays.
I'd recommend looking into some examples using RxPy or reactive frameworks as they provide helpful abstractions for this kind of concurrent, live updating UI.