Jul-02-2024, 11:14 PM
This is my main code:
I have:
The thing is, it just terminates everything, and I need certain events to happen during the termination sequence. In get_market_data at the bottom you see
if __name__ == "__main__": queue_dict = {item[0]: mp.Queue() for item in symlist} # Start market data process market_data_proc = mp.Process(target=gogo_market_data, args=(queue_dict,), daemon=True) market_data_proc.start() # Start symbol-specific processes symbol_procs = [] for item in symlist: p = mp.Process(target=start_symbol_process, args=(item, queue_dict[item[0]])) symbol_procs.append(p) p.start() # Start the termination listener in a separate thread termination_listener = threading.Thread(target=listen_for_termination, daemon=True) termination_listener.start() # Join processes try: while True: if termination_event.is_set(): break for p in symbol_procs: p.join(1) market_data_proc.join(1) except KeyboardInterrupt: termination_event.set() print("Terminating all processes...") for p in symbol_procs: if p.is_alive(): p.terminate() #if market_data_proc.is_alive(): # market_data_proc.terminate() print("All processes terminated.")It calls a starter function to run asyncio on a particular multiprocessing thread:
# Starter function to start asyncio symbol processing def start_symbol_process(symbol_list, data_queue): asyncio.run(process_symbol(symbol_list, data_queue))That in turn calls the asyncio function that does the processing:
# This function streams market data from Polygon async def get_market_data(queue_dict): msg_delays = {'A': [], 'Q': [], 'T': []} async with websockets.connect(POLY_WEBSOCKETS_URL) as websocket: await websocket.send(json.dumps({"action": "auth", "params": POLY_API_KEY})) # Build the string of the services that we are going to subscribe to - need to sub to second aggs and quotes for each param_string = '' #for item in symlist: param_string += 'A.' + item[0] + ',' #for item in symlist: param_string += 'A.' + item[0] + ',Q.' + item[0] + ',' for item in symlist: param_string += 'A.' + item[0] + ',Q.' + item[0] + ',T.' + item[0] + ',' param_string = param_string[:-1] await websocket.send(json.dumps({"action": "subscribe", "params": param_string})) # Receive messages while not termination_event.is_set(): try: message = await websocket.recv() data = json.loads(message) # Iterate over each individual json message in a data packet for json_msg in data: # Process aggregate message if json_msg['ev'] == 'A': msg_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(json_msg['e'] / 1000.0)).astimezone(ny_tz) if log_pg_latency: msg_delays['A'].append(datetime.datetime.now(ny_tz) - msg_time) #print('Aggregates time delay:', msg_delays['A'][-1]) queue_dict[json_msg['sym']].put(('A', (json_msg['v'], json_msg['vw'], json_msg['o'], json_msg['c'], json_msg['h'], json_msg['l'], msg_time))) # Process quote message elif json_msg['ev'] == 'Q': msg_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(json_msg['t'] / 1000.0)).astimezone(ny_tz) if log_pg_latency: msg_delays['Q'].append(datetime.datetime.now(ny_tz) - msg_time) #print('Quotes time delay:', msg_delays['Q'][-1]) queue_dict[json_msg['sym']].put(('Q', json_msg['bp'], json_msg['ap'])) elif json_msg['ev'] == 'T': msg_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(json_msg['t'] / 1000.0)).astimezone(ny_tz) if log_pg_latency: msg_delays['T'].append(datetime.datetime.now(ny_tz) - msg_time) #print('Trades time delay:', msg_delays['T'][-1]) #print(json.dumps(data, indent=4)) except websockets.ConnectionClosed: break await websocket.close() print("Termination event set, processing msg_delays...") print(msg_delays)get_market_data(queue_dict) pulls live stock market data and queue_dict is a mp.Queue() that I use to get this data out to other multiprocessing threads for analysis.
I have:
# Function to listen for keyboard input and set the termination event def listen_for_termination(): print("Press 'x' to terminate the script.") while True: if sys.stdin.read(1).strip().lower() == 'x': termination_event.set() breakSet as a function to listen for a termination signal. It works.
The thing is, it just terminates everything, and I need certain events to happen during the termination sequence. In get_market_data at the bottom you see
print("Termination event set, processing msg_delays...") print(msg_delays)I need those things, and additional code I have yet to write, to run when terminating. But that code never runs it just terminates... I cannot figure out why any help would be appreciated.