Python Forum
Dynamic updating of a nested dictionary in multiprocessing pool
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Dynamic updating of a nested dictionary in multiprocessing pool
#1
I have written a simple code to understand how lack of communication between the child processes leads to a random result when using multiprocessing.Pool. I input a nested dictionary as a dictproxy object made by multiprocessing.Manager (see also the main code below):

manager = Manager()
my_dict = manager.dict()
my_dict['nested'] = nested
into a pool embedding 16 open processes. The nested dictionary is defined below in the main code. The function my_function simply generates the second power of each number stored in the elements of the nested dictionary.

As expected because of the shared memory in multithreading, I get the correct result when I use multiprocessing.dummy:

Output:
{0: 1, 1: 4, 2: 9, 3: 16} {0: 4, 1: 9, 2: 16, 3: 25} {0: 9, 1: 16, 2: 25, 3: 36} {0: 16, 1: 25, 2: 36, 3: 49} {0: 25, 1: 36, 2: 49, 3: 64}
but when I use multiprocessing, the result is incorrect and completely random in each run. One example of the incorrect result is:

Output:
{0: 1, 1: 2, 2: 3, 3: 4} {0: 4, 1: 9, 2: 16, 3: 25} {0: 3, 1: 4, 2: 5, 3: 6} {0: 16, 1: 25, 2: 36, 3: 49} {0: 25, 1: 36, 2: 49, 3: 64}
In this particular run, the 'data' in
Output:
'element'
1 and 3 was not updated. I understand that this happens due to the lack of communication between the child processes which prohibits the "updated" nested dictionary in each child process to be properly sent to the others. However, is it possible to use Manager.Queue to organize this inter-child communication and get the correct results possibly with minimal run-time?

from multiprocessing import Pool, Manager
import numpy as np

def my_function(A):
    arg1 = A[0]
    my_dict = A[1]
    
    temporary_dict = my_dict['nested']
    for arg2 in np.arange(len(my_dict['nested']['elements'][arg1]['data'])):
        temporary_dict['elements'][arg1]['data'][arg2] = temporary_dict['elements'][arg1]['data'][arg2] ** 2 
        
    my_dict['nested'] = temporary_dict
    

if __name__ == '__main__':
    
    
    # nested dictionary definition
    strs1 = {}
    strs2 = {}
    strs3 = {}
    strs4 = {}
    strs5 = {}
    strs1['data'] = {}
    strs2['data'] = {}
    strs3['data'] = {}
    strs4['data'] = {}
    strs5['data'] = {}
    
    for i in [0,1,2,3]:
        strs1['data'][i] = i + 1
        strs2['data'][i] = i + 2
        strs3['data'][i] = i + 3
        strs4['data'][i] = i + 4
        strs5['data'][i] = i + 5
    
    nested = {}
    nested['elements'] = [strs1, strs2, strs3, strs4, strs5]
    nested['names'] = ['series1', 'series2', 'series3', 'series4', 'series5']
    
    # parallel processing
    pool = Pool(processes = 16)
    manager = Manager()
    my_dict = manager.dict()
    my_dict['nested'] = nested

    sequence = np.arange(len(my_dict['nested']['elements']))
    pool.map(my_function, ([seq,my_dict] for seq in sequence))
    
    pool.close()
    pool.join()
    
    # printing the data in all elements of the nested dictionary
    print(my_dict['nested']['elements'][0]['data'])
    print(my_dict['nested']['elements'][1]['data'])
    print(my_dict['nested']['elements'][2]['data'])
    print(my_dict['nested']['elements'][3]['data'])
    print(my_dict['nested']['elements'][4]['data'])
Reply
#2
It works if you write
[pool.apply(my_function, args=[[seq,my_dict]]) for seq in sequence]
instead of your line 48.

Even though, it would be nice to find a way with apply_async, but I couldn't achieve this so far.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  extract tabe from nested dictionary, find generic approach,clever way tonycat 4 2,709 Aug-31-2020, 09:22 AM
Last Post: tonycat
  thread pool Frankie 5 5,577 May-16-2017, 09:50 PM
Last Post: Ofnuts

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020