Dec-17-2024, 04:27 PM
The main function simulates receiving as much data as possible and then forwarding it to other subprocesses for processing. There will actually be multiple subprocesses, and only one subprocess is tested here. The data calculation in the subprocess cannot affect the data reception, and the calculation time is uncertain and may be longer. Below is my test code. I want to know how to optimize it to make it faster.
import multiprocessing import threading import collections import time def doCount(data, num): num=num+1 #print("doCount:",time.time_ns()) time.sleep(0.000001) def threadWorker(d,event): tstart = time.time_ns() num = 0 while True: if d: data = d.popleft() if data is None: break doCount(data,num) else: event.clear() event.wait() tend = time.time_ns() print('thread-recv:', (tend - tstart) / 1000000) def processWorker(conn): d = collections.deque() event = threading.Event() t = threading.Thread(target=threadWorker, args=(d,event), daemon=True) t.start() cstart = time.time_ns() while True: try: data = conn.recv() d.append(data) event.set() if data is None: break except EOFError: break cend = time.time_ns() print('recv:',(cend- cstart)/1000000) print('thread-wait:', time.time_ns()) t.join() print('thread-end:', time.time_ns()) def main(): read_conn, write_conn = multiprocessing.Pipe(duplex=False) p = multiprocessing.Process(target=processWorker, args=(read_conn,)) p.start() pstart = time.time_ns() for i in range(100000): write_conn.send( {"code": f"code {i}", "time": time.time_ns(), "type": 1, "num": 100000, "aaa": "ddd", "bbbb": "dddd", "ccc": "dddd", "fff": "dddd", "ggg": "dddd", "hhh": "dddd", "iii": "dddd"}) write_conn.send(None) write_conn.close() pend = time.time_ns() print('send:', (pend - pstart) / 1000000) p.join() if __name__ == '__main__': main()