Dec-19-2024, 01:25 AM
The standard
My aim is to store and isolate/compartmentalize these DictProxy to a separate class, that will only have one instance through out the execution of the program. The purpose of the dictionary is to keep track of what tasks have been completed, so that a certain type of command may be freely executed by any of the child processes, waiting for more tasks, regardless.
The code in question success fully stores and loads the contents of the manager dictionary. Unforunately problem arises when one tries to modify the value of a nested key value. To make matters worse, these are intermittent errors. I.e. on separate occassion it will cease execution a few statements before (like 5 lines in this case), than the last time it was executed, when not a single change is made to the code.
I have re-written the program in question, to be much more simple and executed as its would on my own computer, without need to import any text file and such. I've also tried a BaseManager class, and same issue occurs.
`class MyManager(multiprocessing.managers.BaseManager):
pass`
To keep things simpler, the value in question is just a char, instead of a list, [0, 0], since I may need to implement Manager.List.
multiprocessing.managers.BaseManager
module does not permit or recognize the use of nested objects, forcing one to resort to multiprocessing.Manager().dict()
module. This module must also be used again on the nested keys of a dictionary.My aim is to store and isolate/compartmentalize these DictProxy to a separate class, that will only have one instance through out the execution of the program. The purpose of the dictionary is to keep track of what tasks have been completed, so that a certain type of command may be freely executed by any of the child processes, waiting for more tasks, regardless.
The code in question success fully stores and loads the contents of the manager dictionary. Unforunately problem arises when one tries to modify the value of a nested key value. To make matters worse, these are intermittent errors. I.e. on separate occassion it will cease execution a few statements before (like 5 lines in this case), than the last time it was executed, when not a single change is made to the code.
I have re-written the program in question, to be much more simple and executed as its would on my own computer, without need to import any text file and such. I've also tried a BaseManager class, and same issue occurs.
`class MyManager(multiprocessing.managers.BaseManager):
pass`
To keep things simpler, the value in question is just a char, instead of a list, [0, 0], since I may need to implement Manager.List.
import multiprocessing import multiprocessing.managers import os, sys import pprint class ProgressDictionary: import multiprocessing # Defining a second 'multiprocessing' module, may be the cause of # AttributeError: 'ForkAwareLocal' object has no attribute 'connection' error def __init__(self): self.dictionary = {} # E.g. k[0] = {'Root Key A': Manager().dict({'Nested key A_1': [1, 0], # 'Nested key A_2': [1, 0], # 'Nested key A_3': [1, 0], # 'Nested key A_4': [1, 0]})} self.mprocDictionary = multiprocessing.Manager().dict() # Will need to be rebuilt each time self.dictionary is .updated() # E.g. mprocDictionary.update(<......>)) # = Manager().dict{'Root Key A': Manager().dict({'Nested key A_1': [1, 0], # ... # 'Nested key A_4': [1, 0]}), # 'Root Key B': Manager().dict({'Nested key B_1': [1, 0], # ... # 'Nested key B_4': [1, 0]})} self.container = multiprocessing.Manager().dict() # Not used def buildMprocDict(self, rootKey, listOfNestedKeys): self.dictionary.update({rootKey: self.addNest(listOfNestedKeys)}) # Debug # pprint.PrettyPrinter(width=128, sort_dicts=False).pprint(self.dictionary) # No braces as intended -> 'Root Key A': <DictProxy object, typeid 'dict' at 0x72335a1908e0> self.mprocDictionary.update(self.dictionary) # Debug # print(type(self.mprocDictionary)) # {'Root Key A': <DictProxy object, typeid 'dict' at 0x79b399d989a0>, # 'Root Key B': <DictProxy object, typeid 'dict' at 0x79b399d98eb0>} # type(self.mprocDictionary): <class 'multiprocessing.managers.DictProxy'> # Not used self.container.update({i : multiprocessing.Manager().dict() for i in self.mprocDictionary.keys()}) # Debug # for k, v in self.mprocDictionary.items(): # print(f'{k} ---:--- {v}') # print(f'K43: {[i for i in self.mprocDictionary.keys()]}') # ['Root Key A', 'Root Key B'] # print('L26: class function exit') def addNest(self, listOfNestedKeys): # Encapsulate nested dicts return self.multiprocessing.Manager().dict({i: 'A' for i in listOfNestedKeys}) def traverseNestedKeyOnly(self, rootKey, nestedKey): for k, v in dict(self.mprocDictionary.get(rootKey, {}).items()): if k == nestedKey: return k def traverseRootAndNestedKey(self, nestedKey): print(f'L54') for k, v in dict(self.mprocDictionary).items(): #File "<string>", line 2, in __getitem__ for k2, v2 in v.items(): # We only know value, but not key, hence roots is also traversed # In pyCharm, v.items() gives warning: Unresolved attribute reference 'items' for class 'str' if k2 == nestedKey: return [k, k2] #if nested key is not found due to being deleted earlier for whatever reason return None def setMemAddress(self, nestedKey, pid, debug_helper): # *args used, incase root key is never supplied. print(f'L64 argument show: {debug_helper}') # No show k, k2 = self.traverseRootAndNestedKey(nestedKey) # for k, v in self.mprocDictionary.items(): # File "<string>", line 2, in items # Sometimes will cut off here, and won't show the following statements # Converting dictProxy to dict: for k, v in dict(self.mprocDictionary).items(), # ...In traverseRootAndNestedKey() method .did not resolve this print(f'L65 key: {k}, val: {k2}') # display correct values #print(f'L68 {self.mprocDictionary[k]}') # <DictProxy object, typeid 'dict' at 0x796feca0fd60; '__str__()' failed> #print(f'L69 {dict(self.mprocDictionary[k])}') # File "<string>", line 2, in __getitem__ if k is not None: # error here, nothing is shown temp = self.mprocDictionary[k][k2] #temp2 = dict(self.mprocDictionary)[k][k2] # File "<string>", line 2, in __getitem__ #temp2[k2] = pid # <DictProxy object, typeid 'dict' at 0x784b40fb3d60; '__str__()' failed> print(f'L78 {temp}') # Will sometimes show None, other times 'A' w/Multiproc # Always shows correct value w/Main Proc #print(f'L68 {temp2}') #temp.append(pid) # [0, 0] -> [0, 0, pid] # self.dictionary[k][k2].append(pid) # no good, must overwrite entire list completely. # i.e.: mprocDictionary[k][k2] = temp #self.mprocDictionary[k][k2]=pid # Works with MainProc, unknown with MultiProc # Intermittent error: # File "<string>", line 2, in __getitem__ temp = pid # Alternative method self.mprocDictionary[k][k2].update(temp) def getMemAddress(self, rootKey, nestedKey): k = self.traverseNestedKeyOnly(rootKey, nestedKey) try: return self.mprocDictionary[rootKey][k] except IndexError: return None def printDictionary(self): pprint.PrettyPrinter(width=96, sort_dicts=False).pprint(self.dictionary) def printMprocDictionary(self): for k, v in self.mprocDictionary.items(): for k2, v2 in v.items(): print(f'L78 {k2} : {v2}') splitCMDqueue = multiprocessing.Queue() qList = [multiprocessing.Queue() for i in range(2)] listCMDs = [] listCMDs.append(['''system cmd a_1''', '''system cmd a_2''', '''system cmd a_3''', '''system cmd a_4''']) listCMDs.append(['''system cmd b_1''', '''system cmd b_2''', '''system cmd b_3''', '''system cmd b_4''']) listCMDcondition = ['''condition_system_cmd_1''', '''condition_system_cmd_2'''] # {'Root Key A': { # 'Nested key A_1': [1, 0], # 'Nested key A_2': [1, 0], # 'Nested key A_3': [1, 0], # 'Nested key A_4': [1, 0]}, # 'Root Key B': { # 'Nested key B_1': [1, 0], # 'Nested key B_2': [1, 0], # 'Nested key B_3': [1, 0], # 'Nested key B_4': [1, 0]}} def worker(sharedClass, nK): print(f'L111 {nK}') sharedClass.setMemAddress(nK, '0x12345FA', 'Call from Line 110') #print("L48") # Shows when previous statement is commented out print(f'L130, {sharedClass.getMemAddress(listCMDcondition[0], nK)}') jobs = [] runtimeCommandProgress = ProgressDictionary() if __name__ == '__main__': for idx, i in enumerate(listCMDs): runtimeCommandProgress.buildMprocDict(listCMDcondition[idx], i) # works/success for j in i: qList[idx].put(j) example_cmd = listCMDs[0][1] # Nested key A_2, system cmd a_2 print(f"L133 {example_cmd}") # Both work # runtimeCommandProgress.setMemAddress(example_cmd, '0x12345FA', 'L136') # print(f'L133 {runtimeCommandProgress.getMemAddress(listCMDcondition[0], example_cmd)}') p = multiprocessing.Process(target=worker, args=(runtimeCommandProgress, example_cmd)) jobs.append(p) for p in jobs: p.start()Distinct error message read:
File "<string>", line 2, in __getitem__
BrokenPipeError: [Errno 32] Broken pipe