Hi,
I want to define nested objects that can be read/written by several processes defined by multiprocessing module.
I search on some networks but it seems to not be solved.
Can you help me to find a solution.
Example of structure below to be solved : the subclass seems to not be seen as a shared object and is not updated, whereas the first level object is.
=> the question is for complex object that linked strcutures as objects, dict or functions ref.
=> it's a real problem to know how to define shareable objects.
I want to define nested objects that can be read/written by several processes defined by multiprocessing module.
I search on some networks but it seems to not be solved.
Can you help me to find a solution.
Example of structure below to be solved : the subclass seems to not be seen as a shared object and is not updated, whereas the first level object is.
=> the question is for complex object that linked strcutures as objects, dict or functions ref.
=> it's a real problem to know how to define shareable objects.
from multiprocessing import Lock, Process, Queue, current_process, Pool from multiprocessing.managers import BaseManager, NamespaceProxy import time import queue # imported for using queue.Empty exception class MyManager(BaseManager): pass def Manager(): m = MyManager() m.start() return m class ConfAppSubClass(): def __init__(self,id): # todo self.subid = id def run(self): # todo pass def get(self): # todo return self.subid def put(self,id): # todo self.subid = 1000 + self.subid + id def sub(self,id): # todo self.subid = 1000 + self.subid + id class ConfApp(): def __init__(self,id): # todo self.id = id self.subclass = ConfAppSubClass(10*id) def run(self): # todo pass def get(self): # todo return self.id def getSubClass(self): # todo return self.subclass def put(self,id): # todo self.id = 2000 + self.id + id def sub(self,id): # todo self.id = 2000 + self.id + id def do_job_tx(tasks_to_accomplish, tasks_that_are_done, tasks_conf_object_out): while True: try: ''' try to get task from the queue. get_nowait() function will raise queue.Empty exception if the queue is empty. queue(False) function would do the same task also. ''' task = tasks_to_accomplish.get_nowait() except queue.Empty: break else: ''' if no exception has been raised, add the task completion message to task_that_are_done queue ''' print(task) tasks_that_are_done.put(task + ' is done by ' + current_process().name) ConfObjIf = tasks_conf_object_out print("Task no %s : out conf obj %s "% (task, str(ConfObjIf))) print("Task no %s : out conf obj val1 %s "% (task, str(ConfObjIf.get()))) ConfObjIf.put(10000) ConfObjIf.getSubClass().put(3000) print("Task no %s : out conf obj val2 %s "% (task, str(ConfObjIf.get()))) print("%s object created as output" % task) time.sleep(.5) return True def do_job_rx(tasks_to_accomplish, tasks_that_are_done, tasks_conf_object_in): while True: try: ''' try to get task from the queue. get_nowait() function will raise queue.Empty exception if the queue is empty. queue(False) function would do the same task also. ''' task = tasks_to_accomplish.get_nowait() except queue.Empty: break else: ''' if no exception has been raised, add the task completion message to task_that_are_done queue ''' print(task) tasks_that_are_done.put(task + ' is done by ' + current_process().name) ConfObjIf = tasks_conf_object_in print("Task no %s : out conf obj %s "% (task, str(ConfObjIf))) print("Task no %s : out conf obj val1 %s "% (task, str(ConfObjIf.get()))) ConfObjIf.sub(500) ConfObjIf.getSubClass().sub(200) print("Task no %s : out conf obj val2 %s "% (task, str(ConfObjIf.get()))) time.sleep(.5) return True def main(): number_of_task = 10 number_of_processes = 4 tasks_to_accomplish = Queue() tasks_that_are_done = Queue() tasks_object_out = Queue() processes = [] MyManager.register('ConfApp', ConfApp) MyManager.register('ConfAppSubClass', ConfAppSubClass) manager = Manager() MyConfObj = manager.ConfApp(5); for i in range(number_of_task): tasks_to_accomplish.put("Task no " + str(i)) # print tasks_to_accomplish entire queue temp = [None,None,None,None,None,None,None,None,None,None] for i in range(number_of_task): temp[i] = tasks_to_accomplish.get() for i in range(number_of_task): tasks_to_accomplish.put(temp[i]) tasks_object_out.put(MyConfObj) #### print(str(temp)) # creating processes for w in range(number_of_processes): p = Process(target=do_job_tx, args=(tasks_to_accomplish, tasks_that_are_done, MyConfObj)) processes.append(p) p.start() # completing process for p in processes: p.join() # print the output : info on processes end while not tasks_that_are_done.empty(): print(tasks_that_are_done.get()) print("MyConfObj: val is %s "% (str(MyConfObj.get()))) print("MyConfObj subclass: val is %s "% (str(MyConfObj.getSubClass().get()))) for i in range(number_of_task): tasks_to_accomplish.put("Task no " + str(i)) tasks_object_out.put(MyConfObj) #### # creating processes for w in range(number_of_processes): p = Process(target=do_job_rx, args=(tasks_to_accomplish, tasks_that_are_done, MyConfObj)) processes.append(p) p.start() # completing process for p in processes: p.join() # print the output : info on processes end while not tasks_that_are_done.empty(): print(tasks_that_are_done.get()) print("MyConfObj: val is %s "% (str(MyConfObj.get()))) print("MyConfObj subclass: val is %s "% (str(MyConfObj.getSubClass().get()))) return True if __name__ == '__main__': main()