Feb-29-2024, 09:45 AM
Maybe this helps as a start.
Additionally, you should look here: https://rich.readthedocs.io/en/stable/live.html
Additionally, you should look here: https://rich.readthedocs.io/en/stable/live.html
import time from collections.abc import Generator from concurrent.futures import ThreadPoolExecutor from operator import itemgetter from queue import Queue from rich import get_console from rich.live import Live from rich.table import Table console = get_console() THE_END = object() def _counter() -> Generator[int, None, None]: value = 0 while True: yield value value += 1 count = _counter() def generate_table(data: dict[int, str] | None) -> Table: data = data or {} table = Table() table.add_column("Worker ID") table.add_column("Status text") for worker_id, msg in sorted(data.items(), key=itemgetter(0)): table.add_row(str(worker_id), msg) return table def show_status(queue: Queue): workers: dict[int, str] = {} with Live() as live: while data := queue.get(): worker_id, msg = data if worker_id is THE_END: queue.task_done() return workers[worker_id] = msg queue.task_done() live.update(generate_table(workers)) def worker(name: str, delay: int | float, queue: Queue) -> int: worker_id = next(count) queue.put((worker_id, f"Starting {name}")) time.sleep(1) queue.put((worker_id, f"Waiting {delay:.2f} seconds.")) time.sleep(delay) queue.put((worker_id, f"{name} done")) return int(delay) def runner(): status_queue = Queue() with ThreadPoolExecutor(3) as pool: pool.submit(show_status, status_queue) futures = [] for i in range(1, 6): futures.append(pool.submit(worker, f"Worker {i}", i, status_queue)) while True: if all(fut.done() for fut in futures): break time.sleep(1) status_queue.put((THE_END, "")) if __name__ == "__main__": runner()
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
All humans together. We don't need politicians!