Python Forum

Full Version: Multiprocessing.Queue with hugh data causes _wait_for_tstate_lock
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
X-Post on StackOverflow

An Exception is raised in threading._wait_for_tstate_lock when I transfere hugh data between a Process and a Thread via multiprocessing.Queue.

I have the vague idea that it has something to do with the fact that a multiprocessing Queue uses a internal Thread and a buffer. But I do not fully understand it.

This is the output and error message of my application
Output:
Running MyProcess... MyProcess stoppd. ^CProcess MyProcess-1: Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'> Traceback (most recent call last): File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown t.join() File "/usr/lib/python3.5/threading.py", line 1054, in join self._wait_for_tstate_lock() File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock elif lock.acquire(block, timeout): KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap util._exit_function() File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function _run_finalizers() File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers finalizer() File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__ res = self._callback(*self._args, **self._kwargs) File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join thread.join() File "/usr/lib/python3.5/threading.py", line 1054, in join self._wait_for_tstate_lock() File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock elif lock.acquire(block, timeout): KeyboardInterrupt
This is the example code
#!/usr/bin/env python3

import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib


class MyThread (threading.Thread):
    """This thread just starts the process."""
    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback

    def run(self):
        print('Running MyThread...')
        self.result = []

        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        process.join()

        while not queue.empty():
            process_result = queue.get()
            self.result.append(process_result)
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess (multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x'*102048))
        print('MyProcess stoppd.')

class MyWindow (Gtk.Window):
    def __init__(self):
        Gtk.Window.__init__(self)
        self.connect('destroy', Gtk.main_quit)
        GLib.timeout_add(2000, self.do_start)

    def do_start(self):
        print('MyWindow::do_start()')
        # The process need to be started from a separate thread
        # to prevent the main thread (which is the gui main loop)
        # from freezing while waiting for the process result.
        self.thread = MyThread(self.callback_thread_finished)
        self.thread.start()

    def callback_thread_finished(self):
        result = self.thread.result
        for r in result:
            print('{} {}...'.format(r[0], r[1][:10]))

if __name__ == '__main__':
    win = MyWindow()
    win.show_all()
    Gtk.main()

The question was answered on StackOverflow: https://stackoverflow.com/a/56324244/4865723 .

I think I totally missunderstand the meaning of join.