Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
waiting for a thread
#1
if i start some threads with threading.Thread() how can i wait for any one thread to end? i do not know the order they will end in advance. it would be nice if it also has a timeout although i could run a timer thread. it would be very nice to know which thread ended but is not absolutely essential.
Tradition is peer pressure from dead people

What do you call someone who speaks three languages? Trilingual. Two languages? Bilingual. One language? American.
Reply
#2
from threading import Thread
from queue import Queue
import time


END_THREAD = object()


def worker(tasks, queue):
    for task in range(1, tasks + 1):
        time.sleep(1)
        queue.put(task)
    # when finished, signalling the consumer to
    # finish
    queue.put(END_THREAD)


def consumer(queue):
    while True:
        task = queue.get()
        if task is END_THREAD:
            queue.task_done()
            return
        print(task)
        queue.task_done()


queue = Queue()
worker_thread = Thread(target=worker, args=[5, queue], daemon=True)
consumer_thread = Thread(target=consumer, args=[queue], daemon=True)

print('Starting Worker Thread')
worker_thread.start()

print('Starting Consumer Thread')
consumer_thread.start()

print('Joining worker_thread and consumer_thread')
worker_thread.join()
consumer_thread.join()
# if you remove the two method calls,
# the program won't wait for the daemon threads
# if they are non-daemon threads, the interpreter waits
# until they are finished


# you could join the queue, but at the beginning
# the queue is empty

print('Program finished')

# in addition you could use other privitives like
# Lock, Event, Queue, Semaphore, ...
Q: What is the benefit of threads?
A: Shared memory.

Q: What is the problem with threads?
A: Shared memory.

Q: Why are processes better than threads?
A: Isolation, no shared memory.

Q: What is the problem with processes
A: Missing shared memory...

The missing shared_memory problem for multiprocessing is solved with Python 3.8: https://docs.python.org/3.9/library/mult...emory.html



A bit more advanced with multiple queues:
from random import random
import time
import threading
import multiprocessing
import queue


class Sentinel:
    def __init__(self, name):
        self._name = name
    def __eq__(self, other):
        if not hasattr(other, '_name'):
            return False
        return self._name == other._name
    def __repr__(self):
        return f'{self._name} sentinel'


END_THREAD = Sentinel('END_THREAD')
END_PROCESS = Sentinel('END_PROCESS')
FINISHED = Sentinel('FINISHED')


def random_sleep():
    t = random()
    time.sleep(t)


def thread_worker(in_q, out_q):
    for element in iter(in_q.get, END_THREAD):
        random_sleep()
        print(f'Thread Worker put: {element}')
        out_q.put(element)
    print('Thread worker finished')
    out_q.put(FINISHED)


def process_worker(in_q, out_q):
    for element in iter(in_q.get, END_PROCESS):
        random_sleep()
        print(f'Process Worker put: {element}')
        out_q.put(element)
    print('Process worker finished')
    out_q.put(FINISHED)


def process_results(in_q):
    for element in iter(in_q.get, FINISHED):
        random_sleep()
        print('Processing results', element)
    print('Processing results finished')


thread_q = queue.Queue()
process_q = multiprocessing.Queue()
thread_results_q = queue.Queue()
process_results_q = multiprocessing.Queue()

# start all threads
threading.Thread(target=thread_worker, args=[thread_q, thread_results_q]).start()
multiprocessing.Process(target=process_worker, args=[process_q, process_results_q]).start()

# start two consumers, one as process one as thread
multiprocessing.Process(target=process_results, args=[process_results_q]).start()
threading.Thread(target=process_results, args=[thread_results_q]).start()


# put tasks in the queue
[thread_q.put(f'Thread Task {n}') for n in range(1,6)]
[process_q.put(f'Process Task {n}') for n in range(10,16)]

# thread_q.put(END_PROCESS) # wrong sentinel
# process_q.put(END_THREAD) # wrong sentinel

# put stop sentinels into the worker queues
thread_q.put(END_THREAD) # right sentinel
process_q.put(END_PROCESS) # right sentinel

# they emit a FINISHED sentinel to the consumer threads
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply
#3
i had thought about using a queue to wait for worker threads to finish. but had thought it was not appropriate to add something like that, probable because i was originally designing around doing processes instead of threads. i was hoping for some kind of .join_one() or .join_any() method to avoid adding a queue. the problem i ran into with processes is that each process would exec*() a command and all i had was a SIGCLD signal when the process exited. the thread was just a way around that. each thread will be running a command, and wait for that specific process. in C i would use the wait() syscall and discover which pid exited by that. but i wanted to accomplish this at least purely in python if not doing it pythonic ... no C like techniques and no os. workarounds. so i think i can do this now and will code up a module to encapsulate the methodology from the application. thanks!!

there is one more problem with processes. on some systems, some users are limited in the number of processes they can create. at a school i used to work at, students were limited to 10 processes. fork() would fail if they already were running 10 (which made it hard to even do "ps" to see what to kill). some of my earlier techniques for the program i needed threads or processes for ended up running 2 processes to do 1 command. that's why threads became attractive.

my design thought now is for the module/class to start an "agent thread" which will be given a command by the main thread calling a method to submit a command. that method will put the command on a queue where the agent will eventually get it when the process count is below the limit. the agent will start a worker thread giving it the command in args. the worker thread will execute the command, perhaps with subprocess.call(). when the command process finishes, the worker will put the command status on the end queue for the agent. i may need a 3rd thread so the agent can wait for either a new command or a finished worker. that or maybe just use a single queue that agent always waits for, and have the agent keep commands to do on a list of its own. with none of the threads doing anything heavy or time critical, i don't think the GIL will be an issue.
Tradition is peer pressure from dead people

What do you call someone who speaks three languages? Trilingual. Two languages? Bilingual. One language? American.
Reply
#4
it seems to me that the way to go (a good design pattern) is that each thread that needs to receive communications from any other thread have just one queue designated for it. that thread can then do queue.get() to wait for more work to do. no need to figure out how to concurrently wait on 2 or more resources. anything that needs to be waited on (even files coming over a slow NFS mount ... something select() and poll() do not support at the system level) you just create a thread (does not need a designated queue) that does put to the queue for whatever thread needs to wait on that resource. i can now envision a bunch of classes for this such as one to read text files and put lines to the queue requested.

threads receiving communications of a mixed type and/or from a mix of sources would get a list or dictionary that includes a source identity and/or a designation of a specific action.
Tradition is peer pressure from dead people

What do you call someone who speaks three languages? Trilingual. Two languages? Bilingual. One language? American.
Reply
#5
i put together a trial of this design pattern in the form of a class that manages an asynchronous command pool. this was just written and needs more work and review before i even do alpha testing. i think it is complete enough to show the way i am thinking about this design pattern which does as much as it can with threads to minimize the number of processes.

one catch is that a command pipeline of any length counts as 1 process. i should make an option to count pipelines for how many processes they will run.

a use case that i have for this is to turbo boost uploads to AWS S3. uploading in parallel is not only OK on S3, it is encouraged. serial uploading (each file waits until the previous upload is complete) can be very slow. another use case is parallel file compression and/or encryption to make better use of many cores. i'm sure i'll come up with more. but at least the GIL will not have much of an impact since the real work is done in separate processes.

and now for the (incomplete) code i have done, so far:
from threading import Thread
from queue import SimpleQueue

__license__ = """
Copyright © 2019, by Phil D. Howard - all other rights reserved

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA, OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE, OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

The author may be contacted by decoding the number
11054987560151472272755686915985840251291393453694611309
(provu igi la numeron al duuma)
"""

def create_thread(**opts):
    """Create a thread.
The return to the caller is the thread reference by itself.
Example usage:  thread = create_thread(target=,args=,kwargs=)
"""
    return Thread(**opts)


def create_thread_with_queue(**opts):
    """Create a thread+queue combo.
The queue is passed to the thread target function in keyword argument 'queue'.
The return to the caller is a 2-tuple with the thread and the queue.
Example usage:  thread, queue = create_thread_with_queue(target=,args=,kwargs=)
"""
    queue = SimpleQueue()
    kwargs = opts['kwargs'] if 'kwargs' in opts else {}
    kwargs['queue'] = queue
    opts['kwargs'] = kwargs
    return Thread(**opts),queue


class cmd_pool:
"""Class to implement an asynchronous command execution pool that limits
the number of commands or command pipelines that can run concurrently.

The available methods:
    obj = cmd_pool()
        Create a cmd_pool instance.
    obj.set_max(maximum_number_of_commands_or_command_pipelines)
        Set the maximum number of commands or command pipelines that
        may be run by this instance.
    obj.run([solocommand,strings,...])
        Submit one simple non-pipeline command to be eventually run. 
    obj.run([[command1,strings,...],[command2,strings,...],...])
        Submit one command pipelin to be eventually run.
    obj.join()
        Join with the cmd_pool to wait for completion of all commands.
    del obj
        Release all memory used by the object instance and remove this
        reference.
""" 

    def __init__(self):
        """Create a cmd_pool instance.
"""
        self.agent_thread, self.agent_queue = create_thread_with_queue(target=self._agent)
        self.agent_thread.start()
        return

    def _agent(self,**kwargs):
        """The agent manages the command multiple execution pool and returns when all done.
"""
        agent_queue = kwargs['queue']
        msg = []
        cmdlist = []
        count = 0
        maxim = 0x7fffffffffffffff
        prtfi = None
        while True:
            if not msg:
                msg = agent_queue.get()
                if not isinstance(msg,list):
                    raise TypeError(f'_agent: got bad object from queue, not a list, {msg!r}')
            elif not cmdlist:
                break
            elif usage<maxim and cmdlist:
                Thread(target=self._worker,args=(agent_queue,cmdlist.pop(0),count)).start()
                count += 1
                usage += 1
            elif msg[0]=='cmd':
                cmdlist.append(msg[1])
            elif msg[0]=='end':
                usage -= 1
            elif msg[0]=='max':
                maxim = msg[1]
            elif msg[0]=='prt':
                prtfile = msg[1]
            msg = msg[2:]
        return

    def _worker(self,agent_queue,cmd,count):
        """The worker thread starts a command or command pipeline then waits for it to end to notidy the agent.
"""
        opts = {}
        proc = []
        for cmd in cmdline[:-1]:
            proc.append(Popen(cmd,stdout=PIPE,**opts))
            opts['stdin']=p[-1].stdout
        opts['stdout'] = open('/dev/null'.'w')
        proc.append(Popen(cmdline[-1],**opts))
        opts['stdout'].close()
        [x.wait() for x in proc]
        agent_queue.put(['end'],count)
        return

    def join(self):
        """Wait for all commands to end.
"""
        return self.agent_thread.join()

    def set_max(self,max_cmds):
        """Set the maximum number of concurrent commands or command lines.
"""
        if not isinstance(max_cmds,int):
            raise TypeError(f'argument is not an int; it is {max_cmds!r}')
        self.agent_queue.put(['max',max_cmds])
        return

    def set_print(self,print_file):
        """Set a file to print informational messages to.
"""
        self.agent_queue.put(['prt',print_file])
        return

    def run(self,cmd_line):
        """Submit a command or command pipeline to eventually be run asynchronously.
A command is given as a list of strings like ['ls','-l'].
A command pipeline is given as a list of commands.
A tuples is allowed in place of a list.
"""
        if not cmd_line:
            raise TypeError(f'cmd_pool.run(): argument 0 is empty or otherwise untrue: {cmd_line!r}')
        if not isinstance(cmd,(list,tuple)):
            raise TypeError(f'cmd_pool.run(): argument 0 is not a list or tuple: {cmd_line!r}')
        if isinstance(cmd_line[0],(str,bytes,bytearray)):
            if not all(isinstance(x,(str,bytes,bytearray)) for x in cmd):
                raise TypeError(f'cmd_pool.run(): a list in argument 0 contains one or more non-string items: {cmd_line!r}')
            cmd_line = [cmd_line]
        else:
            if not all(isinstance(x,(list,tuple))for x in cmd_line):
                raise TypeError(f'cmd_pool.run(): a list in argument 0 contains one or more non-list items: {cmd_line!r}')
            if not all(all(y,(str,bytes,bytearray)for y in x)for x in cmd_line):
                raise TypeError(f'cmd_pool.run(): a list in a list in argument 0 contains one or more non-string items: {cmd_line!r}')
        return self.agent_queue.put(['cmd',cmd_line])
Tradition is peer pressure from dead people

What do you call someone who speaks three languages? Trilingual. Two languages? Bilingual. One language? American.
Reply
#6
a more up to date copy is here.
Tradition is peer pressure from dead people

What do you call someone who speaks three languages? Trilingual. Two languages? Bilingual. One language? American.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  pip stops waiting for python walker 6 1,016 Nov-28-2023, 06:55 PM
Last Post: walker
  Waiting for input from serial port, then move on KenHorse 2 924 Oct-17-2023, 01:14 AM
Last Post: KenHorse
  Waiting for heavy functions question philipbergwerf 14 3,361 Apr-29-2022, 07:31 PM
Last Post: philipbergwerf
  How to create waiting process? samuelbachorik 4 1,964 Sep-02-2021, 05:41 PM
Last Post: bowlofred
  Waiting and listening test 2 2,132 Nov-13-2020, 04:43 PM
Last Post: michael1789
  waiting for barcode scanner output, while main program continues to run lightframe109 3 4,631 Sep-03-2020, 02:19 PM
Last Post: DeaD_EyE
  waiting to connect Skaperen 9 3,523 Aug-17-2020, 05:58 AM
Last Post: Skaperen
  Launch another python command without waiting for a return. SpongeB0B 13 10,875 Jun-18-2020, 10:45 AM
Last Post: Yoriz
  Error SQLite objects created in a thread can only be used in that same thread. binhduonggttn 3 15,532 Jan-31-2020, 11:08 AM
Last Post: DeaD_EyE
  waiting for many processes in parallel Skaperen 2 1,881 Sep-02-2019, 02:20 AM
Last Post: Skaperen

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020