Python Forum
Storing DictProxy object and modifying its nested value?
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Storing DictProxy object and modifying its nested value?
#1
The standard multiprocessing.managers.BaseManager module does not permit or recognize the use of nested objects, forcing one to resort to multiprocessing.Manager().dict() module. This module must also be used again on the nested keys of a dictionary.

My aim is to store and isolate/compartmentalize these DictProxy to a separate class, that will only have one instance through out the execution of the program. The purpose of the dictionary is to keep track of what tasks have been completed, so that a certain type of command may be freely executed by any of the child processes, waiting for more tasks, regardless.

The code in question success fully stores and loads the contents of the manager dictionary. Unforunately problem arises when one tries to modify the value of a nested key value. To make matters worse, these are intermittent errors. I.e. on separate occassion it will cease execution a few statements before (like 5 lines in this case), than the last time it was executed, when not a single change is made to the code.

I have re-written the program in question, to be much more simple and executed as its would on my own computer, without need to import any text file and such. I've also tried a BaseManager class, and same issue occurs.
`class MyManager(multiprocessing.managers.BaseManager):
pass`
To keep things simpler, the value in question is just a char, instead of a list, [0, 0], since I may need to implement Manager.List.

import multiprocessing
import multiprocessing.managers
import os, sys
import pprint

class ProgressDictionary:
    import multiprocessing
    # Defining a second 'multiprocessing' module, may be the cause of
    # AttributeError: 'ForkAwareLocal' object has no attribute 'connection' error
    def __init__(self):
        self.dictionary = {}
        # E.g.    k[0] = {'Root Key A': Manager().dict({'Nested key A_1': [1, 0],
        #                                               'Nested key A_2': [1, 0],
        #                                               'Nested key A_3': [1, 0],
        #                                               'Nested key A_4': [1, 0]})}
        self.mprocDictionary = multiprocessing.Manager().dict()
        # Will need to be rebuilt each time self.dictionary is .updated()
        # E.g. mprocDictionary.update(<......>))
        #         = Manager().dict{'Root Key A': Manager().dict({'Nested key A_1': [1, 0],
        #                                                                   ...
        #                                                        'Nested key A_4': [1, 0]}),
        #                          'Root Key B': Manager().dict({'Nested key B_1': [1, 0],
        #                                                                   ...
        #                                                        'Nested key B_4': [1, 0]})}
        self.container = multiprocessing.Manager().dict()   # Not used

    def buildMprocDict(self, rootKey, listOfNestedKeys):
        self.dictionary.update({rootKey: self.addNest(listOfNestedKeys)})
        # Debug
        # pprint.PrettyPrinter(width=128, sort_dicts=False).pprint(self.dictionary)
        # No braces as intended   -> 'Root Key A': <DictProxy object, typeid 'dict' at 0x72335a1908e0>
        self.mprocDictionary.update(self.dictionary)
        # Debug
        # print(type(self.mprocDictionary))
        # {'Root Key A': <DictProxy object, typeid 'dict' at 0x79b399d989a0>,
        #  'Root Key B': <DictProxy object, typeid 'dict' at 0x79b399d98eb0>}
        # type(self.mprocDictionary): <class 'multiprocessing.managers.DictProxy'>

        # Not used
        self.container.update({i : multiprocessing.Manager().dict()
                                    for i in self.mprocDictionary.keys()})
        # Debug
        # for k, v in self.mprocDictionary.items():
        #     print(f'{k} ---:--- {v}')
        # print(f'K43: {[i for i in self.mprocDictionary.keys()]}')
        # ['Root Key A', 'Root Key B']
        # print('L26: class function exit')
    def addNest(self, listOfNestedKeys):
        # Encapsulate nested dicts
        return self.multiprocessing.Manager().dict({i: 'A' for i in listOfNestedKeys})
    def traverseNestedKeyOnly(self, rootKey, nestedKey):
        for k, v in dict(self.mprocDictionary.get(rootKey, {}).items()):
            if k == nestedKey:
                return k
    def traverseRootAndNestedKey(self, nestedKey):
        print(f'L54')
        for k, v in dict(self.mprocDictionary).items():  #File "<string>", line 2, in __getitem__
            for k2, v2 in v.items():    # We only know value, but not key, hence roots is also traversed
                # In pyCharm, v.items() gives warning: Unresolved attribute reference 'items' for class 'str'
                if k2 == nestedKey:
                    return [k, k2]
        #if nested key is not found due to being deleted earlier for whatever reason
        return None
    def setMemAddress(self, nestedKey, pid, debug_helper):
        # *args used, incase root key is never supplied.
        print(f'L64 argument show: {debug_helper}')   # No show
        k, k2 = self.traverseRootAndNestedKey(nestedKey)      # for k, v in self.mprocDictionary.items():
                                                              # File "<string>", line 2, in items
        # Sometimes will cut off here, and won't show the following statements
        # Converting dictProxy to dict:  for k, v in dict(self.mprocDictionary).items(),
        #                                         ...In traverseRootAndNestedKey() method .did not resolve this
        print(f'L65 key: {k}, val: {k2}')                  # display correct values
        #print(f'L68 {self.mprocDictionary[k]}')  # <DictProxy object, typeid 'dict' at 0x796feca0fd60; '__str__()' failed>
        #print(f'L69 {dict(self.mprocDictionary[k])}')   # File "<string>", line 2, in __getitem__
        if k is not None:
            # error here, nothing is shown
            temp = self.mprocDictionary[k][k2]
            #temp2 = dict(self.mprocDictionary)[k][k2]   # File "<string>", line 2, in __getitem__
            #temp2[k2] = pid                         # <DictProxy object, typeid 'dict' at 0x784b40fb3d60; '__str__()' failed>
            print(f'L78 {temp}')                     # Will sometimes show None, other times 'A' w/Multiproc
                                                     # Always shows correct value w/Main Proc
            #print(f'L68 {temp2}')
            #temp.append(pid)                       # [0, 0] -> [0, 0, pid]
            # self.dictionary[k][k2].append(pid)    # no good, must overwrite entire list completely.
                                                    # i.e.: mprocDictionary[k][k2] = temp
            #self.mprocDictionary[k][k2]=pid         # Works with MainProc, unknown with MultiProc
                                                    # Intermittent error:
                                                    # File "<string>", line 2, in __getitem__
            temp = pid
            # Alternative method
            self.mprocDictionary[k][k2].update(temp)
    def getMemAddress(self, rootKey, nestedKey):
        k = self.traverseNestedKeyOnly(rootKey, nestedKey)
        try:
            return self.mprocDictionary[rootKey][k]
        except IndexError:
            return None
    def printDictionary(self):
        pprint.PrettyPrinter(width=96, sort_dicts=False).pprint(self.dictionary)
    def printMprocDictionary(self):
        for k, v in self.mprocDictionary.items():
            for k2, v2 in v.items():
                print(f'L78 {k2}  :  {v2}')

splitCMDqueue = multiprocessing.Queue()

qList = [multiprocessing.Queue() for i in range(2)]
listCMDs = []
listCMDs.append(['''system cmd a_1''',
                 '''system cmd a_2''',
                 '''system cmd a_3''',
                 '''system cmd a_4'''])
listCMDs.append(['''system cmd b_1''',
                 '''system cmd b_2''',
                 '''system cmd b_3''',
                 '''system cmd b_4'''])
listCMDcondition = ['''condition_system_cmd_1''',
                    '''condition_system_cmd_2''']

# {'Root Key A': {
#      'Nested key A_1': [1, 0],
#      'Nested key A_2': [1, 0],
#      'Nested key A_3': [1, 0],
#      'Nested key A_4': [1, 0]},
#  'Root Key B': {
#      'Nested key B_1': [1, 0],
#      'Nested key B_2': [1, 0],
#      'Nested key B_3': [1, 0],
#      'Nested key B_4': [1, 0]}}
def worker(sharedClass, nK):
    print(f'L111 {nK}')
    sharedClass.setMemAddress(nK, '0x12345FA', 'Call from Line 110')
    #print("L48") # Shows when previous statement is commented out
    print(f'L130, {sharedClass.getMemAddress(listCMDcondition[0], nK)}')

jobs = []
runtimeCommandProgress =  ProgressDictionary()

if __name__ == '__main__':

    for idx, i in enumerate(listCMDs):
        runtimeCommandProgress.buildMprocDict(listCMDcondition[idx], i)  # works/success
        for j in i:
            qList[idx].put(j)

    example_cmd = listCMDs[0][1]  # Nested key A_2, system cmd a_2
    print(f"L133 {example_cmd}")
    # Both work
    # runtimeCommandProgress.setMemAddress(example_cmd, '0x12345FA', 'L136')
    # print(f'L133 {runtimeCommandProgress.getMemAddress(listCMDcondition[0], example_cmd)}')

    p = multiprocessing.Process(target=worker, args=(runtimeCommandProgress, example_cmd))
    jobs.append(p)
    for p in jobs:
        p.start()
Distinct error message read:
File "<string>", line 2, in __getitem__
BrokenPipeError: [Errno 32] Broken pipe
Reply
#2
don't import multiprocessing twice.
remove line 7
Reply
#3
It gave me the message, that the defined class containing the multiprocessing dictionary,
AttributeError: 'ProgressDictionary' object has no attribute 'multiprocessing'

This starts at line 47:
return self.multiprocessing.Manager().dict({i: 'A' for i in listOfNestedKeys})

I removed self, return multiprocessing.Manager().dict({i: 'A' for i in listOfNestedKeys})
And got the error:

in traverseRootAndNestedKey
    for k, v in dict(self.mprocDictionary).items():  #File "<string>", line 2, in __getitem__
  File "<string>", line 2, in __getitem_
FileNotFoundError: [Errno 2] No such file or directory

Also got BrokenPipeError: [Errno 32] Broken pipe, sometimes, when rerunning the script, without making any changes.

Edit: It seems opening the nested dict as a manager dict/DictProxt data type, resulting in an attempt to open a 2nd manage pipes within a single child process, is the likely culprit, and will need to be converted to a regular dict datatype, instead of multiprocessing.managers.DictProxy data type. The root key was already converted to a an 'str' datatype, automatically. I'll try to report back.

Unfotunately the following is generating an error:
temp_dict = {k: dict(v) if isinstance(v, multiprocessing.managers.DictProxy) else v 
                     for k, v in self.mprocDictionary.items()}
File "<string>", line 2, in keys.
FileNotFoundError: [Errno 2] No such file or directory

Culprit seems to be .items() method.
Reply
#4
Try keeping a reference to the manager
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Taking user input and storing that to a variable then storing that variable to a list jowalk 12 41,141 Mar-27-2017, 11:45 PM
Last Post: wavic

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020