Dec-04-2016, 02:58 PM
I have a socket receiving a TCP stream that WAS getting bottle necked with some of the processing. I fixed the problem by giving the socket it's own dedicated process with multiprocessing.Process and having it pass its received data to other processes via multiprocessing.Pipe. It works but it's really ugly and just feels wrong. Is there a better way to do this?
from multiprocessing import Pipe, Process from multiprocessing.manager import SyncManager import socket, signal def mySock(HOST,PORT,pipe_tx): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect((HOST, PORT)) try: while True: data_chunk = sock.recv(4096) pipe_tx.send(data_chunk) except KeyboardInterrupt: print "mySock received KeyboardInterrupt" finally: if not isinstance(sock,_sock, socket._closedsocket): sock.close() def worker(pipe_rx): ''''this worker receives data from pipe_rx and processes it'''' pass def init_mgr(): '''initialize the process manager''' signal.signal(signal.SIGINT, signal.SIG_IGN) if __name__ == "__main__": HOST, PORT = "localhost", 12345 processes = [] manager = SyncManager() manager.start(init_mgr) # simplex pipe pipe_rx, pipe_tx = Pipe() try: # Socket process p = Process(target=mySock, args=(HOST, PORT, pipe_tx)) p.daemon = True p.start() processes.append(p) # worker process p = Process(target=worker, args=(pipe_rx,) p.daemon = True p.start() processes.append(p) try: for process in processes: process.join() except KeyboardInterrupt: print "keyboardInterrupt in __main__" finally manager.shutdown()