Python Forum
Multiprocessing doesn't work in this code
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Multiprocessing doesn't work in this code
#1
Hi there,
I wrote this simple packet interceptor app using pypacker. when I run this code by just calling the start_monitor function it works fine. However, when I run start_monitor() function using multiprocessing library the MonitorTraffic().start_monitoring() function is called (the log is printed on the screen) but it doesn't intercept the packets. No error is thrown. I thought the threading causes this problem, I have commented the related lines but the problem still persists.


    """
    Interceptor example using ICMP
    
    Requirements:
    sudo iptables -I OUTPUT -m ndpi --youtube -j NFQUEUE --queue-balance 0:2 --queue-bypass
    """
    # import time
    
    from pypacker import interceptor
    from pypacker.layer3 import ip
    import zmq
    import logging
    import threading
    from multiprocessing import current_process, Process
    import os
    logging.basicConfig(level=logging.DEBUG)
    
    
    class MonitorTraffic(object):
        def __init__(self, serv_ip='localhost', serv_port=7778):
            self.context = zmq.Context()
            self.req_socket = None
    
            self.req_port = serv_port
            self.server_ip = serv_ip
            self.client_id = 0
            self.sent_flows = set()
            self.lock = threading.Lock()
    
        def connect_to_server(self):
            """
                It connects the REQ socket to the server's REP socket
            :return: None
            """
            logging.debug("[Traffic-insp] [REQ:conn_to_serv] Connecting to the server ....")
            self.req_socket = self.context.socket(zmq.REQ)
            req_conn_str = "tcp://{}:{}".format(self.server_ip, self.req_port)
            self.req_socket.connect(req_conn_str)
    
        def send_flow_info(self, flow_info):
            """
                Sends the flow information to the server, and stores the info in the self.sent_flows.
            :param flow_info: flow_info = (src, dst)
            :return: none
            """
            flow = {
                "client_id": self.client_id,
                "src": flow_info[0],
                "dst": flow_info[1]
            }
            self.req_socket.send_json(flow)
            self.req_socket.recv()
            # flow_info = ()
            logging.debug('[TrafficMonitor] flow <%s> datatype %s' % (flow_info, type(flow_info)))
            self.lock.acquire()
            self.sent_flows.add(flow_info)  # add it to the dictionary only after server received it.
            self.lock.release()
            logging.debug('[TrafficMonitor] sent_flow <%s> ' % self.sent_flows)
    
        def verdict_cb(self, ll_data, ll_proto_id, data, ctx):
            """
                It is a call back function used by the Interceptor.
            :return: ACCPET verdict
            """
            # ip1 = ip.IP(data)
            ip1 = ip.IP(data)
    
            logging.debug('\n[Traffic-inspector]---->Packet received:\n\t source ip:%s , the dst ip:%s, \n\npacket_info:%s'%
                           (ip1.src_s, ip1.dst_s, ip1))
    
            # flow_info = (ip1.src_s, ip1.dst_s)
            # if flow_info not in self.sent_flows:
            #     _t = threading.Thread(target=self.send_flow_info, args=(flow_info, ))
            #     _t.start()
    
            return ip1.bin(), interceptor.NF_ACCEPT
    
        def start_monitoring(self, queue, client_id):
            """
                Start the traffic monitoring.
            :param queue: queue numbers to be monitored
            :param client_id: id of client, used by server.
            :return: None
            """
            logging.debug('[TrafficMonitor][start-monitor] current_proc<%s>, queue %s' % (current_process().name, queue))
            self.client_id = client_id
            ictor = interceptor.Interceptor()
            ictor.start(self.verdict_cb, queue_ids=queue)
    
    
    def start_monitor(queue, id):
        monitor.start_monitoring(queue, id)
    
    
    if __name__ == '__main__':
        monitor = MonitorTraffic()
        while True:
            ans = input('pres y to monitor and x to exit')
            if ans == 'y':
                proc = Process(target=start_monitor, args=([0, 1, 2], 1))
                proc.start()
                # start_monitor([0, 1, 2], 1)

            elif ans == 'x':
                os._exit(0) 
I have re-implemented the code as following:


    from multiprocessing import Process
    #from class_dpi import Dpi
    import os
    
    """
    Interceptor example using ICMP
    
    Requirements:
    sudo iptables -I OUTPUT -m ndpi --youtube -j NFQUEUE --queue-balance 0:2 --queue-bypass
    """
    import time
    from multiprocessing import Process
    import os
    from pypacker import interceptor
    from pypacker.layer3 import ip
    
    class Dpi(object):
    	# ICMP Echo request intercepting
    	def verdict_cb(self, ll_data, ll_proto_id, data, ctx):
    		# ip1 = ip.IP(data)
    		ip1 = ip.IP(data)
    
    
    		print('\n---------->Packet recieved:\n\t source ip:%s , the dst ip:%s, \n\npacket_info:%s '
    			%(ip1.src_s, ip1.dst_s, ip1))
    		return ip1.bin(), interceptor.NF_ACCEPT
    
    	def dpi_start(self):
    		ictor = interceptor.Interceptor()
    		ictor.start(self.verdict_cb, queue_ids=[0, 1, 2])
    
    
    #class PDpi(object):
    #	def __init__(self):
    #		self.dpi = Dpi()
    #	def start_int(self):
    #		proc = Process(target=self.dpi.dpi_start())
    #		proc.daemon = True
    #		proc.start()
    
    def start_int():
    		proc = Process(target=Dpi().dpi_start())
    		#proc.daemon = True
    		proc.start()
    		
    		
    if __name__ == '__main__':
    	while True :
    		a = input('press y to start: ')
    		if a == 'y':
    		    start_int()			
    
    		elif a == 'x':
    			os._exit(0)
This code works but I can't figure out what is the problem with the original one (the first code). Please let me know what the problem could be. It is messing with my head. I would appreciate it if any one could point out the problem.
Reply
#2
This has nothing to do with your problem (at least I don't believe it does), but
yo have duplicate code here:
from multiprocessing import Process
#from class_dpi import Dpi
import os
Reply
#3
(Dec-23-2017, 11:38 PM)Larz60+ Wrote: This has nothing to do with your problem (at least I don't believe it does), but
yo have duplicate code here:
from multiprocessing import Process
#from class_dpi import Dpi
import os
Thanks, you are right it was duplicate.

Any one know any way to diagnose such problems? Because I have no clue where to start for debugging this code...
Reply
#4
Just an idea, are you running your program from IDLE or another Integrated Development Environment tool? IDLE doesn't support multiprocessing if I remember well. Try running your code directly from the console.
Reply
#5
(Dec-24-2017, 10:20 AM)squenson Wrote: Just an idea, are you running your program from IDLE or another Integrated Development Environment tool? IDLE doesn't support multiprocessing if I remember well. Try running your code directly from the console.

I run the codes from terminal. The second multiprocessing code runs as expected but not the first one.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  PIP doesn't work YKR 1 595 Mar-28-2025, 02:10 PM
Last Post: snippsat
  I'm trying to install python 3.11.11 on windows 10 - it doesn't work Petonique 2 1,617 Feb-04-2025, 05:42 PM
Last Post: snippsat
  Can't get graph code to work properly. KDDDC2DS 1 624 Sep-16-2024, 09:17 PM
Last Post: deanhystad
  I can't for the life of me get this basic If statement code to work CandleType1a 8 2,113 May-21-2024, 03:58 PM
Last Post: CandleType1a
  Extending list doesn't work as expected mmhmjanssen 2 1,332 May-09-2024, 05:39 PM
Last Post: Pedroski55
  Multiprocessing: Threads work well. Processes don't work. viyubu 11 4,181 Dec-03-2023, 08:50 PM
Last Post: snippsat
  hi need help to make this code work correctly atulkul1985 5 1,872 Nov-20-2023, 04:38 PM
Last Post: deanhystad
  newbie question - can't make code work tronic72 2 1,487 Oct-22-2023, 09:08 PM
Last Post: tronic72
  Why doesn't calling a parent constructor work with arbitrary keyword arguments? PurposefulCoder 4 1,876 Jun-24-2023, 02:14 PM
Last Post: deanhystad
  Code works but doesn't give the right results colin_dent 2 1,385 Jun-22-2023, 06:04 PM
Last Post: jefsummers

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020