Python Forum
Terminating Subprocesses and Threads while they're calculating
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Terminating Subprocesses and Threads while they're calculating
#1
I am working on a larger project, where I have 2 Threads (same process) and one separate process. One of the threads is the gui, the other thread is a sentinel thread, observing the subprocess, and the subprocess is doing some heavy lifting with neural networks. The architecture looks somewhat like this:

[Image: 8uI4KBJ.png]

I need to be able to cancel the process of the neural network and respectively end the sentinel thread. I have created a small example which shows the architecture generally and what I approach to do.

from multiprocessing import Process, Queue
from threading import Thread
from time import sleep


class Worker(Process):
    # The worker resembles the neural network. It does some calculations and shares
    # the information via the queue.
    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue

    def run(self):
        i = 0
        while True:
            self.queue.put(i)
            i += 1

    def stop(self):
        # I used the stop function for trying out some things, like using a joinable 
        # queue and block execution as long as the queue is not empty, which is not 
        # working
        self.queue.put(None)
        self.terminate()


class Listener(Thread):
    # This class resembles the sentinel thread. It checks in an infinite loop for
    # messages. In the real application I send signals via the signals and slots
    # design pattern to the gui and display the sent information.

    def __init__(self):
        Thread.__init__(self)
        self.queue = Queue()
        self.worker = Worker(self.queue)

    def run(self):
        self.worker.start()
        while True:
            data = self.queue.get()
            if data is not None:
                print(data)
            else:
                break
        print("broken")

    def stop(self):
        self.worker.stop()


class System:
    # This class resembles the gui

    def __init__(self):
        self.listener = Listener()

    def start(self):
        self.listener.start()

    def stop(self):
        self.listener.stop()


if __name__ == "__main__":
    system = System()
    system.start()
    sleep(0.1)
    system.stop()
What is the problem
As long as a process reads or writes to the queue, and/or the queue is not emptied properly, one or both of the processes become zombie processes, which basically is a deadlock in some sense. Therefore I need to find a way to properly handle the queue when terminating the process, thus the processes terminate without errors.

What I have tried so far:
  1. Using a Joinable Queue and join() for each task_done()
  2. Rewriting the SIGTERM signalhandler to wait the queue to be emptied
  3. Using a Joinable Queue and only join() within the SIGTERM signalhandler

The results:
  1. The speed of the processing collapsed greatly, but termination worked properly
  2. termination does not work the way I implemented it
  3. Sometimes it worked, sometimes it did not. So no reliable output and knowledge from this method

An attempt for (3) is the following:

class Worker(Process):

    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue
        self.abort = False
        self.lock = Lock()
        signal(SIGTERM, self.stop)

    def run(self):
        i = 0
        while True:
            self.lock.acquire()
            if self.abort:
                break
            else:
                self.queue.put(i)
                i += 1
            self.lock.release()
        exit(0)

    def stop(self, sig, frame):
        self.abort = True
        self.queue.put(None)
        self.queue.join()
        exit(0)
Reply
#2
As a workaround, I suggest to use Queue.get() with a timeout, say half a second in the sentinel thread instead of a blocking Queue.get() and check if the worker process is still running when the timeout occurs. This gives the sentinel thread an option to exit instead of waiting for queue events when the worker thread is already dead.
lvlanson likes this post
Reply
#3
(Oct-16-2020, 07:08 PM)Gribouillis Wrote: As a workaround, I suggest to use Queue.get() with a timeout, say half a second in the sentinel thread instead of a blocking Queue.get() and check if the worker process is still running when the timeout occurs. This gives the sentinel thread an option to exit instead of waiting for queue events when the worker thread is already dead.

I guess this is a last resort option. There still would data remain in the multiprocessing queue. My instinct tells me that somehow interrupt handlers/ asynchronous event handlers should do the trick somehow.
Reply
#4
There are still things to try. You could call queue.close() and queue.join_thread() in the worker process when it receives the instruction to stop. This would wait until every data previously sent to the queue has been written in the underlying pipe. Also it would raise an exception if the run() function still tries to put() anything else in the queue.
lvlanson likes this post
Reply
#5
(Oct-17-2020, 06:04 AM)Gribouillis Wrote: There are still things to try. You could call queue.close() and queue.join_thread() in the worker process when it receives the instruction to stop. This would wait until every data previously sent to the queue has been written in the underlying pipe. Also it would raise an exception if the run() function still tries to put() anything else in the queue.

This solution works
from multiprocessing import Process, Lock
from threading import Thread
from time import sleep
from sys import exit
from multiprocessing import Queue


class Worker(Process):

    def __init__(self, queue: Queue):
        Process.__init__(self)
        self.queue = queue

    def run(self):
        i = 0
        try:
            while True:
                self.queue.put(i)
                i += 1
        except ValueError:
            pass

    def stop(self):
        self.queue.put("END")
        self.queue.close()
        self.queue.join_thread()
        self.terminate()


class Listener(Thread):

    def __init__(self):
        Thread.__init__(self)
        self.queue = Queue()
        self.worker = Worker(self.queue)
        self.is_running = True

    def run(self):
        self.worker.start()
        while self.is_running:
            try:
                data = self.queue.get()
                if data != "END":
                    pass
                else:
                    self.is_running = False
            except TypeError:
                self.is_running = False

    def stop(self):
        self.worker.stop()
        self.is_running = False


class System:

    def __init__(self):
        self.listener = Listener()

    def start(self):
        self.listener.start()

    def stop(self):
        self.listener.stop()


if __name__ == "__main__":
    system = System()
    system.start()
    sleep(0.1)
    system.stop()
    sleep(1)
    print(f"Process Alive: {system.listener.worker.is_alive()}")
    print(f"Thread Alive: {system.listener.is_alive()}")
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Subprocesses not opening File Select Dialog teut 2 2,416 Feb-22-2021, 08:07 PM
Last Post: teut
Question Terminating threads Gilush 1 1,889 Jun-09-2020, 09:57 AM
Last Post: Gribouillis
  Using Terminating Signal to Terminate Long Threads crcali 1 2,589 Apr-06-2018, 01:26 AM
Last Post: woooee

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020