Hi there,
I wrote this simple packet interceptor app using
I wrote this simple packet interceptor app using
pypacker
. when I run this code by just calling the start_monitor
function it works fine. However, when I run start_monitor()
function using multiprocessing library the MonitorTraffic().start_monitoring()
function is called (the log is printed on the screen) but it doesn't intercept the packets. No error is thrown. I thought the threading causes this problem, I have commented the related lines but the problem still persists.""" Interceptor example using ICMP Requirements: sudo iptables -I OUTPUT -m ndpi --youtube -j NFQUEUE --queue-balance 0:2 --queue-bypass """ # import time from pypacker import interceptor from pypacker.layer3 import ip import zmq import logging import threading from multiprocessing import current_process, Process import os logging.basicConfig(level=logging.DEBUG) class MonitorTraffic(object): def __init__(self, serv_ip='localhost', serv_port=7778): self.context = zmq.Context() self.req_socket = None self.req_port = serv_port self.server_ip = serv_ip self.client_id = 0 self.sent_flows = set() self.lock = threading.Lock() def connect_to_server(self): """ It connects the REQ socket to the server's REP socket :return: None """ logging.debug("[Traffic-insp] [REQ:conn_to_serv] Connecting to the server ....") self.req_socket = self.context.socket(zmq.REQ) req_conn_str = "tcp://{}:{}".format(self.server_ip, self.req_port) self.req_socket.connect(req_conn_str) def send_flow_info(self, flow_info): """ Sends the flow information to the server, and stores the info in the self.sent_flows. :param flow_info: flow_info = (src, dst) :return: none """ flow = { "client_id": self.client_id, "src": flow_info[0], "dst": flow_info[1] } self.req_socket.send_json(flow) self.req_socket.recv() # flow_info = () logging.debug('[TrafficMonitor] flow <%s> datatype %s' % (flow_info, type(flow_info))) self.lock.acquire() self.sent_flows.add(flow_info) # add it to the dictionary only after server received it. self.lock.release() logging.debug('[TrafficMonitor] sent_flow <%s> ' % self.sent_flows) def verdict_cb(self, ll_data, ll_proto_id, data, ctx): """ It is a call back function used by the Interceptor. :return: ACCPET verdict """ # ip1 = ip.IP(data) ip1 = ip.IP(data) logging.debug('\n[Traffic-inspector]---->Packet received:\n\t source ip:%s , the dst ip:%s, \n\npacket_info:%s'% (ip1.src_s, ip1.dst_s, ip1)) # flow_info = (ip1.src_s, ip1.dst_s) # if flow_info not in self.sent_flows: # _t = threading.Thread(target=self.send_flow_info, args=(flow_info, )) # _t.start() return ip1.bin(), interceptor.NF_ACCEPT def start_monitoring(self, queue, client_id): """ Start the traffic monitoring. :param queue: queue numbers to be monitored :param client_id: id of client, used by server. :return: None """ logging.debug('[TrafficMonitor][start-monitor] current_proc<%s>, queue %s' % (current_process().name, queue)) self.client_id = client_id ictor = interceptor.Interceptor() ictor.start(self.verdict_cb, queue_ids=queue) def start_monitor(queue, id): monitor.start_monitoring(queue, id) if __name__ == '__main__': monitor = MonitorTraffic() while True: ans = input('pres y to monitor and x to exit') if ans == 'y': proc = Process(target=start_monitor, args=([0, 1, 2], 1)) proc.start() # start_monitor([0, 1, 2], 1) elif ans == 'x': os._exit(0)I have re-implemented the code as following:
from multiprocessing import Process #from class_dpi import Dpi import os """ Interceptor example using ICMP Requirements: sudo iptables -I OUTPUT -m ndpi --youtube -j NFQUEUE --queue-balance 0:2 --queue-bypass """ import time from multiprocessing import Process import os from pypacker import interceptor from pypacker.layer3 import ip class Dpi(object): # ICMP Echo request intercepting def verdict_cb(self, ll_data, ll_proto_id, data, ctx): # ip1 = ip.IP(data) ip1 = ip.IP(data) print('\n---------->Packet recieved:\n\t source ip:%s , the dst ip:%s, \n\npacket_info:%s ' %(ip1.src_s, ip1.dst_s, ip1)) return ip1.bin(), interceptor.NF_ACCEPT def dpi_start(self): ictor = interceptor.Interceptor() ictor.start(self.verdict_cb, queue_ids=[0, 1, 2]) #class PDpi(object): # def __init__(self): # self.dpi = Dpi() # def start_int(self): # proc = Process(target=self.dpi.dpi_start()) # proc.daemon = True # proc.start() def start_int(): proc = Process(target=Dpi().dpi_start()) #proc.daemon = True proc.start() if __name__ == '__main__': while True : a = input('press y to start: ') if a == 'y': start_int() elif a == 'x': os._exit(0)This code works but I can't figure out what is the problem with the original one (the first code). Please let me know what the problem could be. It is messing with my head. I would appreciate it if any one could point out the problem.