Terminating Subprocesses and Threads while they're calculating - Printable Version +- Python Forum (https://python-forum.io) +-- Forum: Python Coding (https://python-forum.io/forum-7.html) +--- Forum: General Coding Help (https://python-forum.io/forum-8.html) +--- Thread: Terminating Subprocesses and Threads while they're calculating (/thread-30342.html) |
Terminating Subprocesses and Threads while they're calculating - lvlanson - Oct-16-2020 I am working on a larger project, where I have 2 Threads (same process) and one separate process. One of the threads is the gui, the other thread is a sentinel thread, observing the subprocess, and the subprocess is doing some heavy lifting with neural networks. The architecture looks somewhat like this: I need to be able to cancel the process of the neural network and respectively end the sentinel thread. I have created a small example which shows the architecture generally and what I approach to do. from multiprocessing import Process, Queue from threading import Thread from time import sleep class Worker(Process): # The worker resembles the neural network. It does some calculations and shares # the information via the queue. def __init__(self, queue: Queue): Process.__init__(self) self.queue = queue def run(self): i = 0 while True: self.queue.put(i) i += 1 def stop(self): # I used the stop function for trying out some things, like using a joinable # queue and block execution as long as the queue is not empty, which is not # working self.queue.put(None) self.terminate() class Listener(Thread): # This class resembles the sentinel thread. It checks in an infinite loop for # messages. In the real application I send signals via the signals and slots # design pattern to the gui and display the sent information. def __init__(self): Thread.__init__(self) self.queue = Queue() self.worker = Worker(self.queue) def run(self): self.worker.start() while True: data = self.queue.get() if data is not None: print(data) else: break print("broken") def stop(self): self.worker.stop() class System: # This class resembles the gui def __init__(self): self.listener = Listener() def start(self): self.listener.start() def stop(self): self.listener.stop() if __name__ == "__main__": system = System() system.start() sleep(0.1) system.stop()What is the problem As long as a process reads or writes to the queue, and/or the queue is not emptied properly, one or both of the processes become zombie processes, which basically is a deadlock in some sense. Therefore I need to find a way to properly handle the queue when terminating the process, thus the processes terminate without errors. What I have tried so far:
The results:
An attempt for (3) is the following: class Worker(Process): def __init__(self, queue: Queue): Process.__init__(self) self.queue = queue self.abort = False self.lock = Lock() signal(SIGTERM, self.stop) def run(self): i = 0 while True: self.lock.acquire() if self.abort: break else: self.queue.put(i) i += 1 self.lock.release() exit(0) def stop(self, sig, frame): self.abort = True self.queue.put(None) self.queue.join() exit(0) RE: Terminating Subprocesses and Threads while they're calculating - Gribouillis - Oct-16-2020 As a workaround, I suggest to use Queue.get() with a timeout, say half a second in the sentinel thread instead of a blocking Queue.get() and check if the worker process is still running when the timeout occurs. This gives the sentinel thread an option to exit instead of waiting for queue events when the worker thread is already dead.
RE: Terminating Subprocesses and Threads while they're calculating - lvlanson - Oct-16-2020 (Oct-16-2020, 07:08 PM)Gribouillis Wrote: As a workaround, I suggest to use I guess this is a last resort option. There still would data remain in the multiprocessing queue. My instinct tells me that somehow interrupt handlers/ asynchronous event handlers should do the trick somehow. RE: Terminating Subprocesses and Threads while they're calculating - Gribouillis - Oct-17-2020 There are still things to try. You could call queue.close() and queue.join_thread() in the worker process when it receives the instruction to stop. This would wait until every data previously sent to the queue has been written in the underlying pipe. Also it would raise an exception if the run() function still tries to put() anything else in the queue.
RE: Terminating Subprocesses and Threads while they're calculating - lvlanson - Oct-17-2020 (Oct-17-2020, 06:04 AM)Gribouillis Wrote: There are still things to try. You could call This solution works from multiprocessing import Process, Lock from threading import Thread from time import sleep from sys import exit from multiprocessing import Queue class Worker(Process): def __init__(self, queue: Queue): Process.__init__(self) self.queue = queue def run(self): i = 0 try: while True: self.queue.put(i) i += 1 except ValueError: pass def stop(self): self.queue.put("END") self.queue.close() self.queue.join_thread() self.terminate() class Listener(Thread): def __init__(self): Thread.__init__(self) self.queue = Queue() self.worker = Worker(self.queue) self.is_running = True def run(self): self.worker.start() while self.is_running: try: data = self.queue.get() if data != "END": pass else: self.is_running = False except TypeError: self.is_running = False def stop(self): self.worker.stop() self.is_running = False class System: def __init__(self): self.listener = Listener() def start(self): self.listener.start() def stop(self): self.listener.stop() if __name__ == "__main__": system = System() system.start() sleep(0.1) system.stop() sleep(1) print(f"Process Alive: {system.listener.worker.is_alive()}") print(f"Thread Alive: {system.listener.is_alive()}") |