Oct-16-2020, 05:46 PM
I am working on a larger project, where I have 2 Threads (same process) and one separate process. One of the threads is the gui, the other thread is a sentinel thread, observing the subprocess, and the subprocess is doing some heavy lifting with neural networks. The architecture looks somewhat like this:
![[Image: 8uI4KBJ.png]](https://i.imgur.com/8uI4KBJ.png)
I need to be able to cancel the process of the neural network and respectively end the sentinel thread. I have created a small example which shows the architecture generally and what I approach to do.
As long as a process reads or writes to the queue, and/or the queue is not emptied properly, one or both of the processes become zombie processes, which basically is a deadlock in some sense. Therefore I need to find a way to properly handle the queue when terminating the process, thus the processes terminate without errors.
What I have tried so far:
The results:
An attempt for (3) is the following:
![[Image: 8uI4KBJ.png]](https://i.imgur.com/8uI4KBJ.png)
I need to be able to cancel the process of the neural network and respectively end the sentinel thread. I have created a small example which shows the architecture generally and what I approach to do.
from multiprocessing import Process, Queue from threading import Thread from time import sleep class Worker(Process): # The worker resembles the neural network. It does some calculations and shares # the information via the queue. def __init__(self, queue: Queue): Process.__init__(self) self.queue = queue def run(self): i = 0 while True: self.queue.put(i) i += 1 def stop(self): # I used the stop function for trying out some things, like using a joinable # queue and block execution as long as the queue is not empty, which is not # working self.queue.put(None) self.terminate() class Listener(Thread): # This class resembles the sentinel thread. It checks in an infinite loop for # messages. In the real application I send signals via the signals and slots # design pattern to the gui and display the sent information. def __init__(self): Thread.__init__(self) self.queue = Queue() self.worker = Worker(self.queue) def run(self): self.worker.start() while True: data = self.queue.get() if data is not None: print(data) else: break print("broken") def stop(self): self.worker.stop() class System: # This class resembles the gui def __init__(self): self.listener = Listener() def start(self): self.listener.start() def stop(self): self.listener.stop() if __name__ == "__main__": system = System() system.start() sleep(0.1) system.stop()What is the problem
As long as a process reads or writes to the queue, and/or the queue is not emptied properly, one or both of the processes become zombie processes, which basically is a deadlock in some sense. Therefore I need to find a way to properly handle the queue when terminating the process, thus the processes terminate without errors.
What I have tried so far:
- Using a Joinable Queue and
join()
for eachtask_done()
- Rewriting the SIGTERM signalhandler to wait the queue to be emptied
- Using a Joinable Queue and only
join()
within the SIGTERM signalhandler
The results:
- The speed of the processing collapsed greatly, but termination worked properly
- termination does not work the way I implemented it
- Sometimes it worked, sometimes it did not. So no reliable output and knowledge from this method
An attempt for (3) is the following:
class Worker(Process): def __init__(self, queue: Queue): Process.__init__(self) self.queue = queue self.abort = False self.lock = Lock() signal(SIGTERM, self.stop) def run(self): i = 0 while True: self.lock.acquire() if self.abort: break else: self.queue.put(i) i += 1 self.lock.release() exit(0) def stop(self, sig, frame): self.abort = True self.queue.put(None) self.queue.join() exit(0)