Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Help with multiprocessing
#1
I am fairly new to Python, but have run into a snag that I can't get past. I am trying to share a string variable between two processes. One process is capturing mqtt messages. I want the other
process to manipulate that data that is passed. Sharing data between these two processes has proven very difficult. Can you help? Thanks...
import multiprocessing
import time
import paho.mqtt.client as mqtt
import queue
q=queue.Queue()

def main2():
    data = "start in main2"
    print(data)
    q.put(data)

def do_mqtt():
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("192.168.1.31", 1883)
    client.loop_forever()
    
def on_connect(client, userdata, flags,rc):
    print("Connected with Code:" +str(rc))
    # Subscribe Topic
    client.subscribe("mqtt/master/#")

def on_message( client, userdata, msg):
    print ( str(msg.payload))
    data = "on_message:" +str(msg.payload)
    print(data)
    q.put(1,data)
    
def do_it():
    print("Number of cpu : ", multiprocessing.cpu_count())
    while q:
        print(q.get())
    
if __name__ == "__main__":
   
    p1 = multiprocessing.Process(target=do_it(), args=())
    p2 = multiprocessing.Process(target=do_mqtt(), args=())
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("starting Main ")
    while q:
        print(q.get())
Reply
#2
Please describe the exact issue you're experiencing.
From the code, I foresee a problem on lines 41 and 42. Process.join() blocks execution until the Process terminates - meaning until it returns. Neither function used in p1 or p2 has a return statement. As such, they do not terminate and p1.join() will block forever.
Reply
#3
In order to maintain faster program execution, I wanted to have two parallel processes running. The first one will be responsible for mqtt messages. The second process is to manipulate the messages. Therefore it is necessary to pass data(mqtt msgs) from the first process. Thanks.
Reply
#4
Yes, I understand that. What exactly isn't working? Is the programming halting like I suspect or is it terminating prematurely or something else?
Reply
#5
Yes, I am trying to learn about multiprocessing. As you have pointed out, the process that receives the mqtt msgs is a blocked process. It does stop the other processes from running. I thought the main problem I was having had to do with being able to share the msgs using queue. Can you help me with running a process with a blocked loop?
Reply
#6
After looking over it again, the join() calls probably aren't the problem. Rather, it's likely the Queue that's the culprit. Multiprocessing uses different processors which normally would not be able to share information. This is because each processor has its own dedicated memory cache; this contrasts with multithreading which runs multiple threads on a single processor.

However, the multiprocessing module provides utilities to get around that. Check out this Geek for Geek article about multiprocessing for possible solutions.
Reply
#7
Have you been able to make any progress on this? I am facing the exact same problem.

The only initial suggestion I have for you is to try
client.loop_start()
in place of
client.loop_forever()
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020