Python Forum
Help with graceful shutdown of MP thread with asyncio
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Help with graceful shutdown of MP thread with asyncio
#1
This is my main code:

if __name__ == "__main__":
    queue_dict = {item[0]: mp.Queue() for item in symlist}

    # Start market data process
    market_data_proc = mp.Process(target=gogo_market_data, args=(queue_dict,), daemon=True)
    market_data_proc.start()

    # Start symbol-specific processes
    symbol_procs = []
    for item in symlist:
        p = mp.Process(target=start_symbol_process, args=(item, queue_dict[item[0]]))
        symbol_procs.append(p)
        p.start()

    # Start the termination listener in a separate thread
    termination_listener = threading.Thread(target=listen_for_termination, daemon=True)
    termination_listener.start()

    # Join processes
    try:
        while True:
            if termination_event.is_set():
                break
            for p in symbol_procs:
                p.join(1)
            market_data_proc.join(1)
    except KeyboardInterrupt:
        termination_event.set()

    print("Terminating all processes...")
    for p in symbol_procs:
        if p.is_alive():
            p.terminate()
    #if market_data_proc.is_alive():
    #    market_data_proc.terminate()
    print("All processes terminated.")
It calls a starter function to run asyncio on a particular multiprocessing thread:

# Starter function to start asyncio symbol processing
def start_symbol_process(symbol_list, data_queue): asyncio.run(process_symbol(symbol_list, data_queue))
That in turn calls the asyncio function that does the processing:

# This function streams market data from Polygon
async def get_market_data(queue_dict):
    
    msg_delays = {'A': [], 'Q': [], 'T': []}

    async with websockets.connect(POLY_WEBSOCKETS_URL) as websocket:
        await websocket.send(json.dumps({"action": "auth", "params": POLY_API_KEY}))
        # Build the string of the services that we are going to subscribe to - need to sub to second aggs and quotes for each
        param_string = ''
        #for item in symlist: param_string += 'A.' + item[0] + ','
        #for item in symlist: param_string += 'A.' + item[0] + ',Q.' + item[0] + ','
        for item in symlist: param_string += 'A.' + item[0] + ',Q.' + item[0] + ',T.' + item[0] + ','
        
        param_string = param_string[:-1]
        await websocket.send(json.dumps({"action": "subscribe", "params": param_string}))

        # Receive messages
        while not termination_event.is_set():
            try:
                message = await websocket.recv()
                data = json.loads(message)
                # Iterate over each individual json message in a data packet
                for json_msg in data:
                    # Process aggregate message
                    if json_msg['ev'] == 'A': 
                        msg_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(json_msg['e'] / 1000.0)).astimezone(ny_tz)
                        if log_pg_latency: 
                            msg_delays['A'].append(datetime.datetime.now(ny_tz) - msg_time)
                            #print('Aggregates time delay:', msg_delays['A'][-1])
                        queue_dict[json_msg['sym']].put(('A', (json_msg['v'], json_msg['vw'], json_msg['o'], json_msg['c'], json_msg['h'], json_msg['l'], msg_time)))
                    # Process quote message
                    elif json_msg['ev'] == 'Q': 
                        msg_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(json_msg['t'] / 1000.0)).astimezone(ny_tz)
                        if log_pg_latency: 
                            msg_delays['Q'].append(datetime.datetime.now(ny_tz) - msg_time)
                            #print('Quotes time delay:', msg_delays['Q'][-1])
                        queue_dict[json_msg['sym']].put(('Q', json_msg['bp'], json_msg['ap']))
                    elif json_msg['ev'] == 'T':
                        msg_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(json_msg['t'] / 1000.0)).astimezone(ny_tz)
                        if log_pg_latency: 
                            msg_delays['T'].append(datetime.datetime.now(ny_tz) - msg_time)
                            #print('Trades time delay:', msg_delays['T'][-1])
                #print(json.dumps(data, indent=4))
            except websockets.ConnectionClosed: break
        await websocket.close()
    print("Termination event set, processing msg_delays...")
    print(msg_delays)
get_market_data(queue_dict) pulls live stock market data and queue_dict is a mp.Queue() that I use to get this data out to other multiprocessing threads for analysis.

I have:
# Function to listen for keyboard input and set the termination event
def listen_for_termination():
    print("Press 'x' to terminate the script.")
    while True:
        if sys.stdin.read(1).strip().lower() == 'x':
            termination_event.set()
            break
Set as a function to listen for a termination signal. It works.

The thing is, it just terminates everything, and I need certain events to happen during the termination sequence. In get_market_data at the bottom you see

print("Termination event set, processing msg_delays...")
    print(msg_delays)
I need those things, and additional code I have yet to write, to run when terminating. But that code never runs it just terminates... I cannot figure out why any help would be appreciated.
Reply
#2
This might be helpful The little book of semiphores
It's been helpful in my own async and threaded code, and it's free.
Reply
#3
You are making this really difficult(or may be just wrong approch) bye mixing multiprocessing, threading, asyncio in same code.
I think no one will look at code as it is now(not complete) eg missing imports will show that you use 3 mention.
import multiprocessing as mp
import threading
import asyncio
Can maybe look at it if you post whole code,or maybe not as reason posted.
Reply
#4
First off, thanks for the book of semiphores, that is interesting I will need to make some time to read it.

I can't post the entire code because it contains a stock trading algo which is proprietary.

Reason I'm using multiprocessing and asyncio is we are getting stock market data from polygon.io which is just a data provider. One thread just pre-processes this data and sends it elsewhere. Then, each individual stock symbol being traded will be on it's own thread as they are independent from each other. I'm using asyncio because for each individual symbol there will be a ton of trading actions sent to/from broker via a REST API.

I also figured out the problem in my above code, termination_event is a mp.Event() but I was not properly passing it to each individual function - therefore when termination_event was set, functions weren't properly detecting it as set. I fixed the passing issue now it is working as expected.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
Lightbulb shutdown host from docker container cosmin1805 0 1,603 Nov-27-2022, 06:34 PM
Last Post: cosmin1805
  Os.system("shutdown"); command not found cosmin1805 4 3,139 Nov-13-2022, 02:07 PM
Last Post: cosmin1805
  Possible to execute a python script before log off/shutdown with input commands? Kaltex 1 3,082 May-18-2021, 06:31 AM
Last Post: Skaperen
  Error SQLite objects created in a thread can only be used in that same thread. binhduonggttn 3 19,077 Jan-31-2020, 11:08 AM
Last Post: DeaD_EyE
  Detecting windows shutdown event riccardoob 4 7,504 Nov-12-2019, 04:51 PM
Last Post: Aurthor_King_of_the_Brittons
  how this to make a asyncio thread safe? linziyan 0 2,970 Jun-07-2018, 10:33 AM
Last Post: linziyan
  Asyncio within a thread hollymcr 2 89,557 Mar-12-2018, 08:36 AM
Last Post: hollymcr
  Shutdown from remote trough Android andrea1980345 6 8,654 Nov-15-2016, 10:17 PM
Last Post: andrea1980345

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020