Python Forum
Multiprocessing doesn't work in this code
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Multiprocessing doesn't work in this code
#1
Hi there,
I wrote this simple packet interceptor app using pypacker. when I run this code by just calling the start_monitor function it works fine. However, when I run start_monitor() function using multiprocessing library the MonitorTraffic().start_monitoring() function is called (the log is printed on the screen) but it doesn't intercept the packets. No error is thrown. I thought the threading causes this problem, I have commented the related lines but the problem still persists.


    """
    Interceptor example using ICMP
    
    Requirements:
    sudo iptables -I OUTPUT -m ndpi --youtube -j NFQUEUE --queue-balance 0:2 --queue-bypass
    """
    # import time
    
    from pypacker import interceptor
    from pypacker.layer3 import ip
    import zmq
    import logging
    import threading
    from multiprocessing import current_process, Process
    import os
    logging.basicConfig(level=logging.DEBUG)
    
    
    class MonitorTraffic(object):
        def __init__(self, serv_ip='localhost', serv_port=7778):
            self.context = zmq.Context()
            self.req_socket = None
    
            self.req_port = serv_port
            self.server_ip = serv_ip
            self.client_id = 0
            self.sent_flows = set()
            self.lock = threading.Lock()
    
        def connect_to_server(self):
            """
                It connects the REQ socket to the server's REP socket
            :return: None
            """
            logging.debug("[Traffic-insp] [REQ:conn_to_serv] Connecting to the server ....")
            self.req_socket = self.context.socket(zmq.REQ)
            req_conn_str = "tcp://{}:{}".format(self.server_ip, self.req_port)
            self.req_socket.connect(req_conn_str)
    
        def send_flow_info(self, flow_info):
            """
                Sends the flow information to the server, and stores the info in the self.sent_flows.
            :param flow_info: flow_info = (src, dst)
            :return: none
            """
            flow = {
                "client_id": self.client_id,
                "src": flow_info[0],
                "dst": flow_info[1]
            }
            self.req_socket.send_json(flow)
            self.req_socket.recv()
            # flow_info = ()
            logging.debug('[TrafficMonitor] flow <%s> datatype %s' % (flow_info, type(flow_info)))
            self.lock.acquire()
            self.sent_flows.add(flow_info)  # add it to the dictionary only after server received it.
            self.lock.release()
            logging.debug('[TrafficMonitor] sent_flow <%s> ' % self.sent_flows)
    
        def verdict_cb(self, ll_data, ll_proto_id, data, ctx):
            """
                It is a call back function used by the Interceptor.
            :return: ACCPET verdict
            """
            # ip1 = ip.IP(data)
            ip1 = ip.IP(data)
    
            logging.debug('\n[Traffic-inspector]---->Packet received:\n\t source ip:%s , the dst ip:%s, \n\npacket_info:%s'%
                           (ip1.src_s, ip1.dst_s, ip1))
    
            # flow_info = (ip1.src_s, ip1.dst_s)
            # if flow_info not in self.sent_flows:
            #     _t = threading.Thread(target=self.send_flow_info, args=(flow_info, ))
            #     _t.start()
    
            return ip1.bin(), interceptor.NF_ACCEPT
    
        def start_monitoring(self, queue, client_id):
            """
                Start the traffic monitoring.
            :param queue: queue numbers to be monitored
            :param client_id: id of client, used by server.
            :return: None
            """
            logging.debug('[TrafficMonitor][start-monitor] current_proc<%s>, queue %s' % (current_process().name, queue))
            self.client_id = client_id
            ictor = interceptor.Interceptor()
            ictor.start(self.verdict_cb, queue_ids=queue)
    
    
    def start_monitor(queue, id):
        monitor.start_monitoring(queue, id)
    
    
    if __name__ == '__main__':
        monitor = MonitorTraffic()
        while True:
            ans = input('pres y to monitor and x to exit')
            if ans == 'y':
                proc = Process(target=start_monitor, args=([0, 1, 2], 1))
                proc.start()
                # start_monitor([0, 1, 2], 1)

            elif ans == 'x':
                os._exit(0) 
I have re-implemented the code as following:


    from multiprocessing import Process
    #from class_dpi import Dpi
    import os
    
    """
    Interceptor example using ICMP
    
    Requirements:
    sudo iptables -I OUTPUT -m ndpi --youtube -j NFQUEUE --queue-balance 0:2 --queue-bypass
    """
    import time
    from multiprocessing import Process
    import os
    from pypacker import interceptor
    from pypacker.layer3 import ip
    
    class Dpi(object):
    	# ICMP Echo request intercepting
    	def verdict_cb(self, ll_data, ll_proto_id, data, ctx):
    		# ip1 = ip.IP(data)
    		ip1 = ip.IP(data)
    
    
    		print('\n---------->Packet recieved:\n\t source ip:%s , the dst ip:%s, \n\npacket_info:%s '
    			%(ip1.src_s, ip1.dst_s, ip1))
    		return ip1.bin(), interceptor.NF_ACCEPT
    
    	def dpi_start(self):
    		ictor = interceptor.Interceptor()
    		ictor.start(self.verdict_cb, queue_ids=[0, 1, 2])
    
    
    #class PDpi(object):
    #	def __init__(self):
    #		self.dpi = Dpi()
    #	def start_int(self):
    #		proc = Process(target=self.dpi.dpi_start())
    #		proc.daemon = True
    #		proc.start()
    
    def start_int():
    		proc = Process(target=Dpi().dpi_start())
    		#proc.daemon = True
    		proc.start()
    		
    		
    if __name__ == '__main__':
    	while True :
    		a = input('press y to start: ')
    		if a == 'y':
    		    start_int()			
    
    		elif a == 'x':
    			os._exit(0)
This code works but I can't figure out what is the problem with the original one (the first code). Please let me know what the problem could be. It is messing with my head. I would appreciate it if any one could point out the problem.
Reply
#2
This has nothing to do with your problem (at least I don't believe it does), but
yo have duplicate code here:
from multiprocessing import Process
#from class_dpi import Dpi
import os
Reply
#3
(Dec-23-2017, 11:38 PM)Larz60+ Wrote: This has nothing to do with your problem (at least I don't believe it does), but
yo have duplicate code here:
from multiprocessing import Process
#from class_dpi import Dpi
import os
Thanks, you are right it was duplicate.

Any one know any way to diagnose such problems? Because I have no clue where to start for debugging this code...
Reply
#4
Just an idea, are you running your program from IDLE or another Integrated Development Environment tool? IDLE doesn't support multiprocessing if I remember well. Try running your code directly from the console.
Reply
#5
(Dec-24-2017, 10:20 AM)squenson Wrote: Just an idea, are you running your program from IDLE or another Integrated Development Environment tool? IDLE doesn't support multiprocessing if I remember well. Try running your code directly from the console.

I run the codes from terminal. The second multiprocessing code runs as expected but not the first one.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Multiprocessing: Threads work well. Processes don't work. viyubu 11 1,765 Dec-03-2023, 08:50 PM
Last Post: snippsat
  hi need help to make this code work correctly atulkul1985 5 770 Nov-20-2023, 04:38 PM
Last Post: deanhystad
  newbie question - can't make code work tronic72 2 674 Oct-22-2023, 09:08 PM
Last Post: tronic72
  Why doesn't calling a parent constructor work with arbitrary keyword arguments? PurposefulCoder 4 928 Jun-24-2023, 02:14 PM
Last Post: deanhystad
  Code works but doesn't give the right results colin_dent 2 709 Jun-22-2023, 06:04 PM
Last Post: jefsummers
  Beginner: Code not work when longer list raiviscoding 2 814 May-19-2023, 11:19 AM
Last Post: deanhystad
  Why doesn't this code work? What is wrong with path? Melcu54 7 1,771 Jan-29-2023, 06:24 PM
Last Post: Melcu54
  Code used to work 100%, now sometimes works! muzicman0 5 1,427 Jan-13-2023, 05:09 PM
Last Post: muzicman0
  color code doesn't work harryvl 1 882 Dec-29-2022, 08:59 PM
Last Post: deanhystad
  Something the code dont work AlexPython 13 2,228 Oct-17-2022, 08:34 PM
Last Post: AlexPython

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020