Python Forum
multiprocessing and sharing object
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
multiprocessing and sharing object
#1
Hi,

I want to define nested objects that can be read/written by several processes defined by multiprocessing module.
I search on some networks but it seems to not be solved.

Can you help me to find a solution.

Example of structure below to be solved : the subclass seems to not be seen as a shared object and is not updated, whereas the first level object is.
=> the question is for complex object that linked strcutures as objects, dict or functions ref.
=> it's a real problem to know how to define shareable objects.


from multiprocessing import Lock, Process, Queue, current_process, Pool
from multiprocessing.managers import BaseManager, NamespaceProxy
import time
import queue # imported for using queue.Empty exception

class MyManager(BaseManager): pass

def Manager():
    m = MyManager()
    m.start()
    return m 

class ConfAppSubClass():
    def __init__(self,id):
        # todo
        self.subid = id

    def run(self):
        # todo
        pass

    def get(self):
        # todo
        return self.subid
        
    def put(self,id):
        # todo
        self.subid    = 1000 + self.subid + id

    def sub(self,id):
        # todo
        self.subid    = 1000 + self.subid + id

class ConfApp():
    def __init__(self,id):
        # todo
        self.id = id
        self.subclass = ConfAppSubClass(10*id)

    def run(self):
        # todo
        pass

    def get(self):
        # todo
        return self.id

    def getSubClass(self):
        # todo
        return self.subclass
        
    def put(self,id):
        # todo
        self.id    = 2000 + self.id + id

    def sub(self,id):
        # todo
        self.id    = 2000 + self.id + id
        
def do_job_tx(tasks_to_accomplish, tasks_that_are_done, tasks_conf_object_out):
    while True:
        try:
            '''
                try to get task from the queue. get_nowait() function will 
                raise queue.Empty exception if the queue is empty. 
                queue(False) function would do the same task also.
            '''
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:

            break
        else:
            '''
                if no exception has been raised, add the task completion 
                message to task_that_are_done queue
            '''
            print(task)
            tasks_that_are_done.put(task + ' is done by ' + current_process().name)
            ConfObjIf = tasks_conf_object_out
            print("Task no %s : out conf obj %s "% (task, str(ConfObjIf)))
            print("Task no %s : out conf obj val1 %s "% (task, str(ConfObjIf.get())))
            ConfObjIf.put(10000)
            ConfObjIf.getSubClass().put(3000)
            print("Task no %s : out conf obj val2 %s "% (task, str(ConfObjIf.get())))
            print("%s object created as output" % task)
            time.sleep(.5)
    return True

def do_job_rx(tasks_to_accomplish, tasks_that_are_done, tasks_conf_object_in):
    while True:
        try:
            '''
                try to get task from the queue. get_nowait() function will 
                raise queue.Empty exception if the queue is empty. 
                queue(False) function would do the same task also.
            '''
            task = tasks_to_accomplish.get_nowait()
        except queue.Empty:

            break
        else:
            '''
                if no exception has been raised, add the task completion 
                message to task_that_are_done queue
            '''
            print(task)
            tasks_that_are_done.put(task + ' is done by ' + current_process().name)
            ConfObjIf = tasks_conf_object_in
            print("Task no %s : out conf obj %s "% (task, str(ConfObjIf)))
            print("Task no %s : out conf obj val1 %s "% (task, str(ConfObjIf.get())))
            ConfObjIf.sub(500)
            ConfObjIf.getSubClass().sub(200)
            print("Task no %s : out conf obj val2 %s "% (task, str(ConfObjIf.get())))
            time.sleep(.5)
    return True

def main():
    number_of_task = 10
    number_of_processes = 4
    tasks_to_accomplish = Queue()
    tasks_that_are_done = Queue()
    tasks_object_out = Queue()    
    processes = []

    MyManager.register('ConfApp', ConfApp)
    MyManager.register('ConfAppSubClass', ConfAppSubClass)
    manager = Manager()
	
    MyConfObj = manager.ConfApp(5);
    
    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no " + str(i))

    # print tasks_to_accomplish entire queue
    temp = [None,None,None,None,None,None,None,None,None,None]
    for i in range(number_of_task):
        temp[i] = tasks_to_accomplish.get()
    for i in range(number_of_task):
        tasks_to_accomplish.put(temp[i])
        tasks_object_out.put(MyConfObj) ####
    print(str(temp))

    # creating processes
    for w in range(number_of_processes):
        p = Process(target=do_job_tx, args=(tasks_to_accomplish, tasks_that_are_done, MyConfObj))
        processes.append(p)
        p.start()

    # completing process
    for p in processes:
        p.join()

    # print the output : info on processes end
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    print("MyConfObj: val is %s "% (str(MyConfObj.get())))
    print("MyConfObj subclass: val is %s "% (str(MyConfObj.getSubClass().get())))

    for i in range(number_of_task):
        tasks_to_accomplish.put("Task no " + str(i))
        tasks_object_out.put(MyConfObj) ####

    # creating processes
    for w in range(number_of_processes):
        p = Process(target=do_job_rx, args=(tasks_to_accomplish, tasks_that_are_done, MyConfObj))
        processes.append(p)
        p.start()

    # completing process
    for p in processes:
        p.join()

    # print the output : info on processes end
    while not tasks_that_are_done.empty():
        print(tasks_that_are_done.get())

    print("MyConfObj: val is %s "% (str(MyConfObj.get())))
    print("MyConfObj subclass: val is %s "% (str(MyConfObj.getSubClass().get())))
    
    return True


if __name__ == '__main__':
    main()
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Best way to secure API key when sharing quarinteen 2 341 Jan-19-2024, 04:46 PM
Last Post: deanhystad
  How to install modules for 2.7 (or sharing from 3.8)) Persisto 2 2,431 Dec-31-2021, 02:33 PM
Last Post: Persisto
  Multiprocessing Can't pickle local object law 1 15,985 Aug-30-2021, 02:49 PM
Last Post: law
  sharing variables between two processes Kiyoshi767 1 1,869 Nov-07-2020, 04:00 AM
Last Post: ndc85430
  Sharing my code emirasal 2 2,038 Oct-04-2020, 02:21 PM
Last Post: emirasal
  Sharing X Axis in Sub plots JoeDainton123 1 2,179 Aug-22-2020, 04:11 AM
Last Post: deanhystad
  Multiprocessing, class, run and a Queue Object SeanInColo 0 1,535 Jul-12-2020, 05:36 PM
Last Post: SeanInColo
  Instances sharing attributes midarq 4 2,474 Sep-20-2019, 11:13 AM
Last Post: midarq
  How to sharing object between multiple process from main process using Pipe Subrata 1 3,657 Sep-03-2019, 09:49 PM
Last Post: woooee
  Sharing variables across modules j.crater 4 3,448 Jul-30-2018, 09:09 PM
Last Post: j.crater

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020