Oct-16-2020, 05:46 PM
I am working on a larger project, where I have 2 Threads (same process) and one separate process. One of the threads is the gui, the other thread is a sentinel thread, observing the subprocess, and the subprocess is doing some heavy lifting with neural networks. The architecture looks somewhat like this:
![[Image: 8uI4KBJ.png]](https://i.imgur.com/8uI4KBJ.png)
I need to be able to cancel the process of the neural network and respectively end the sentinel thread. I have created a small example which shows the architecture generally and what I approach to do.
What is the problem
As long as a process reads or writes to the queue, and/or the queue is not emptied properly, one or both of the processes become zombie processes, which basically is a deadlock in some sense. Therefore I need to find a way to properly handle the queue when terminating the process, thus the processes terminate without errors.
What I have tried so far:
The results:
An attempt for (3) is the following:
![[Image: 8uI4KBJ.png]](https://i.imgur.com/8uI4KBJ.png)
I need to be able to cancel the process of the neural network and respectively end the sentinel thread. I have created a small example which shows the architecture generally and what I approach to do.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
from multiprocessing import Process, Queue from threading import Thread from time import sleep class Worker(Process): # The worker resembles the neural network. It does some calculations and shares # the information via the queue. def __init__( self , queue: Queue): Process.__init__( self ) self .queue = queue def run( self ): i = 0 while True : self .queue.put(i) i + = 1 def stop( self ): # I used the stop function for trying out some things, like using a joinable # queue and block execution as long as the queue is not empty, which is not # working self .queue.put( None ) self .terminate() class Listener(Thread): # This class resembles the sentinel thread. It checks in an infinite loop for # messages. In the real application I send signals via the signals and slots # design pattern to the gui and display the sent information. def __init__( self ): Thread.__init__( self ) self .queue = Queue() self .worker = Worker( self .queue) def run( self ): self .worker.start() while True : data = self .queue.get() if data is not None : print (data) else : break print ( "broken" ) def stop( self ): self .worker.stop() class System: # This class resembles the gui def __init__( self ): self .listener = Listener() def start( self ): self .listener.start() def stop( self ): self .listener.stop() if __name__ = = "__main__" : system = System() system.start() sleep( 0.1 ) system.stop() |
As long as a process reads or writes to the queue, and/or the queue is not emptied properly, one or both of the processes become zombie processes, which basically is a deadlock in some sense. Therefore I need to find a way to properly handle the queue when terminating the process, thus the processes terminate without errors.
What I have tried so far:
- Using a Joinable Queue and
join()
for eachtask_done()
- Rewriting the SIGTERM signalhandler to wait the queue to be emptied
- Using a Joinable Queue and only
join()
within the SIGTERM signalhandler
The results:
- The speed of the processing collapsed greatly, but termination worked properly
- termination does not work the way I implemented it
- Sometimes it worked, sometimes it did not. So no reliable output and knowledge from this method
An attempt for (3) is the following:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
class Worker(Process): def __init__( self , queue: Queue): Process.__init__( self ) self .queue = queue self .abort = False self .lock = Lock() signal(SIGTERM, self .stop) def run( self ): i = 0 while True : self .lock.acquire() if self .abort: break else : self .queue.put(i) i + = 1 self .lock.release() exit( 0 ) def stop( self , sig, frame): self .abort = True self .queue.put( None ) self .queue.join() exit( 0 ) |