Posts: 45
Threads: 13
Joined: Feb 2020
Feb-09-2021, 01:11 PM
(This post was last modified: Feb-09-2021, 01:11 PM by WiPi.)
Hi guys,
I'm struggling to get the correct code configuration to update an integer variable across multi-process processes!
My code in brief is below:
import multiprocessing
from multiprocessing import Value
error_value = Value('i',0) #assign as integer type
def get_data(url):
try:
...code doing some stuff here
except ValueError as e:
#if error thrown up simply count it
error_value.acquire()
error_value.value += 1
error_value.release()
pass
if __name__ == '__main__':
# ---- Perform multi-processing
p = multiprocessing.Pool(6)
results = p.map(get_data,'argslist')
p.close()
p.join()
print(str(error_value.value)) So in the first instance I just want to count the number of errors thrown up in the function.
The output in the final print statement is zero but I know this not to be the case.
Any ideas please?
Posts: 2,121
Threads: 10
Joined: May 2017
Feb-09-2021, 02:34 PM
(This post was last modified: Feb-09-2021, 02:34 PM by DeaD_EyE.)
Your code is ok.
Just replaced one line for testing:
import multiprocessing
from multiprocessing import Value
error_value = Value('i',0) #assign as integer type
def get_data(url):
try:
raise ValueError
except ValueError as e:
#if error thrown up simply count it
error_value.acquire()
error_value.value += 1
error_value.release()
pass # <- not required, does nothing
if __name__ == '__main__':
# ---- Perform multi-processing
p = multiprocessing.Pool(6)
results = p.map(get_data,'argslist')
p.close()
p.join()
print(str(error_value.value)) Output: 8
This is right, because the str "argslist" consist of 8 elements.
With a context manager it's easier to handle the locking:
import multiprocessing
from multiprocessing import Value
error_value = Value('i',0) #assign as integer type
def get_data(url):
try:
# all even numbers -> ZeroDivisionError
if url % 2 == 0:
1 / 0
except ZeroDivisionError as e:
with error_value.get_lock():
# read/write is not an atomic operatoion
# lock is required to do it right
# the context manager makes the use of it easier
error_value.value += 1
print("ZeroDivisionError -->", url, "/ 0")
if __name__ == '__main__':
with multiprocessing.Pool(6) as p:
results = p.map(get_data, [0,1,2,3,4])
# usually you should consume the results
# actually get_data does not return explicit something
# so it returns implicit for each call None
for result in results:
...
# pro tip: 3 dots is Ellipsis, a placeholder used for numpy
# I use it often to have a placeholder for missing code
print("Total errors:", error_value.value) multiprocessing.Pool can also used with a context manager.
Then the Pool is automatically closed, if leaving the block.
Output: ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 3
Posts: 45
Threads: 13
Joined: Feb 2020
(Feb-09-2021, 02:34 PM)DeaD_EyE Wrote: Your code is ok.
Just replaced one line for testing:
import multiprocessing
from multiprocessing import Value
error_value = Value('i',0) #assign as integer type
def get_data(url):
try:
raise ValueError
except ValueError as e:
#if error thrown up simply count it
error_value.acquire()
error_value.value += 1
error_value.release()
pass # <- not required, does nothing
if __name__ == '__main__':
# ---- Perform multi-processing
p = multiprocessing.Pool(6)
results = p.map(get_data,'argslist')
p.close()
p.join()
print(str(error_value.value)) Output: 8
This is right, because the str "argslist" consist of 8 elements.
With a context manager it's easier to handle the locking:
import multiprocessing
from multiprocessing import Value
error_value = Value('i',0) #assign as integer type
def get_data(url):
try:
# all even numbers -> ZeroDivisionError
if url % 2 == 0:
1 / 0
except ZeroDivisionError as e:
with error_value.get_lock():
# read/write is not an atomic operatoion
# lock is required to do it right
# the context manager makes the use of it easier
error_value.value += 1
print("ZeroDivisionError -->", url, "/ 0")
if __name__ == '__main__':
with multiprocessing.Pool(6) as p:
results = p.map(get_data, [0,1,2,3,4])
# usually you should consume the results
# actually get_data does not return explicit something
# so it returns implicit for each call None
for result in results:
...
# pro tip: 3 dots is Ellipsis, a placeholder used for numpy
# I use it often to have a placeholder for missing code
print("Total errors:", error_value.value) multiprocessing.Pool can also used with a context manager.
Then the Pool is automatically closed, if leaving the block.
Output: ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 3
Thanks very much for your reply. Sorry for any stupid questions coming up but I am not a programmer...
I ran the code and got this:
Output: ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 0
No idea why the final output has not counted the errors?
If I put a print statement within the exception code I can see the error_value variable updating but then it does not carry to the final output statement.
I don't know if this has something to do with it but I am using Python 3.7.1 with IDLE IDE which I know is not good! In fact I need to run the script in the cmd console to see 'print' output because the IDLE shell does not show them during multi-processing. But nevertheless the print output in the exception still shows here but does not carry over the final error_value. Frustrating!
Posts: 2,121
Threads: 10
Joined: May 2017
If I try my own code with IDLE, then nothing works :-D
I run code usually from command line.
It seems that multiprocessing has some trouble with IDLE.
I tried it with Python 3.9.1 and 3.5.10. No results.
Here my tests:
andre@andre-GP70-2PE:~$ pyenv shell 3.
3.5.10 3.7.7 3.8.6 3.9.1
andre@andre-GP70-2PE:~$ pyenv shell 3.5.10
andre@andre-GP70-2PE:~$ python test1.py
ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 3
andre@andre-GP70-2PE:~$ pyenv shell 3.7.7
andre@andre-GP70-2PE:~$ python test1.py
ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 3
andre@andre-GP70-2PE:~$ pyenv shell 3.8.6
andre@andre-GP70-2PE:~$ python test1.py
ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 4 / 0
ZeroDivisionError --> 2 / 0
Total errors: 3
andre@andre-GP70-2PE:~$ pyenv shell 3.9.1
andre@andre-GP70-2PE:~$ python test1.py
ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 3
andre@andre-GP70-2PE:~$
Posts: 3,458
Threads: 101
Joined: Sep 2016
If I run the code from the command line (on Windows 10, at least), I also get 0: Output: > python .\spam.py
ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 0
If you want to use anything inside a function that's running on a separate process, pass it to the function. Do not rely on globals at all. It's possible that each process creates it's own copy of the error_value . Multiprocessing, in addition, works differently depending on what platform you're on, which leads to different results (looks like on most platforms, it uses fork(), while on Windows it does... something different?).
But you can't just pass the Value object to the process pool, because Pool.map() doesn't pickle arguments (or at least that's what I gather from the 10 seconds I looked at the error). Which means using a multiprocessing.Manager, and creating a Value through that.
With these changes made, I'm left with the following: import itertools
import multiprocessing
from multiprocessing import Value
# error_value = Value('i', 0) # assign as integer type
def get_data(args):
url, error_value = args
try:
# all even numbers -> ZeroDivisionError
if url % 2 == 0:
1 / 0
except ZeroDivisionError as e:
# with error_value.get_lock():
# read/write is not an atomic operatoion
# lock is required to do it right
# the context manager makes the use of it easier
error_value.value += 1
print("ZeroDivisionError -->", url, "/ 0")
if __name__ == '__main__':
with multiprocessing.Pool(6) as p:
with multiprocessing.Manager() as manager:
error_value = manager.Value('i', 0)
args = itertools.product([0, 1, 2, 3, 4], [error_value])
results = p.map(get_data, args)
# usually you should consume the results
# actually get_data does not return explicit something
# so it returns implicit for each call None
for result in results:
...
# pro tip: 3 dots is Ellipsis, a placeholder used for numpy
# I use it often to have a placeholder for missing code
print("Total errors:", error_value.value) Which yields the following: Output: > python .\spam.py
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
ZeroDivisionError --> 0 / 0
Total errors: 3
This might be a good time to mention how *hard* parallel programming is.
Posts: 45
Threads: 13
Joined: Feb 2020
Feb-11-2021, 09:00 AM
(This post was last modified: Feb-11-2021, 09:00 AM by WiPi.)
(Feb-10-2021, 11:12 PM)nilamo Wrote: If I run the code from the command line (on Windows 10, at least), I also get 0:Output: > python .\spam.py
ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 0
If you want to use anything inside a function that's running on a separate process, pass it to the function. Do not rely on globals at all. It's possible that each process creates it's own copy of the error_value . Multiprocessing, in addition, works differently depending on what platform you're on, which leads to different results (looks like on most platforms, it uses fork(), while on Windows it does... something different?).
But you can't just pass the Value object to the process pool, because Pool.map() doesn't pickle arguments (or at least that's what I gather from the 10 seconds I looked at the error). Which means using a multiprocessing.Manager, and creating a Value through that.
With these changes made, I'm left with the following:import itertools
import multiprocessing
from multiprocessing import Value
# error_value = Value('i', 0) # assign as integer type
def get_data(args):
url, error_value = args
try:
# all even numbers -> ZeroDivisionError
if url % 2 == 0:
1 / 0
except ZeroDivisionError as e:
# with error_value.get_lock():
# read/write is not an atomic operatoion
# lock is required to do it right
# the context manager makes the use of it easier
error_value.value += 1
print("ZeroDivisionError -->", url, "/ 0")
if __name__ == '__main__':
with multiprocessing.Pool(6) as p:
with multiprocessing.Manager() as manager:
error_value = manager.Value('i', 0)
args = itertools.product([0, 1, 2, 3, 4], [error_value])
results = p.map(get_data, args)
# usually you should consume the results
# actually get_data does not return explicit something
# so it returns implicit for each call None
for result in results:
...
# pro tip: 3 dots is Ellipsis, a placeholder used for numpy
# I use it often to have a placeholder for missing code
print("Total errors:", error_value.value) Which yields the following:Output: > python .\spam.py
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
ZeroDivisionError --> 0 / 0
Total errors: 3
This might be a good time to mention how *hard* parallel programming is.
This actually works on my system!! (which is running Windows 10) - so thank you (both) for submitting code for this.
I knew multiprocessing runs separate incidents and therefore variables need to be shared (somehow) but it's a minefield!
I was using multi-threading which is a bit less complicated but seems to throw up more processing errors when using Pandas (for example).
As I mentioned I'm not a programmer so I was interested to see the format for passing arguments to a function. So rather than use 'global variable' statements you would recommend passing them directly through the function like this?
def get_data(args):
url, error_value, var3, var4, var5 = args
try:
...
if __name__ == '__main__':
with multiprocessing.Pool(6) as p:
with multiprocessing.Manager() as manager:
error_value = manager.Value('i', 0)
args = itertools.product([0, 1, 2, 3, 4], [error_value], [var3], [var4], [var5]) I also mentioned I am using IDLE IDE which I know has limitations. I know about PyDev and PyCharm but I have to download Eclipse as well for PyDev.
What IDE would you recommend that I can download to use with my exiting installation of the actual Python software?
Posts: 2,121
Threads: 10
Joined: May 2017
Yes, multiprocessing/threading is hard and testing it in different environments can give different results.
Just had forgotten to use a Manager for it.
It should also work, if the multiprocessing.Value() is created on module level, but given the worker function as an argument.
Maybe you try it by yourself.
Posts: 3,458
Threads: 101
Joined: Sep 2016
(Feb-11-2021, 09:00 AM)WiPi Wrote: What IDE would you recommend that I can download to use with my exiting installation of the actual Python software?
I like VS Code. But anything other than IDLE should be fine. I wish they'd remove idle from the default python installation.
Posts: 6,779
Threads: 20
Joined: Feb 2020
Windows doesn't have fork, so it starts a new process and imports the Python code. This can give you very different results, the most obvious and immediate being that any code not part of a function and not hidden by a conditional get's executed when you start the new process.
Posts: 45
Threads: 13
Joined: Feb 2020
Feb-12-2021, 11:02 AM
(This post was last modified: Feb-12-2021, 11:02 AM by WiPi.)
Hi guys,
Just when I thought I had it cracked I've run into another 'odd' problem with the multi-processing code.
I am simply counting the number of times a condition is met - so as in the code below the variable being used is inside an 'if' statement.
import multiprocessing
from multiprocessing import Value
import itertools
def get_data(args):
url, error_value = args
try:
...
if 'condition is met':
error_value += 1
print('error_value = ', error_value.value)
...
except Exception as e:
...
if __name__ == '__main__':
with multiprocessing.Pool(6) as p:
with multiprocessing.Manager() as manager:
error_value = manager.Value('i', 0)
args = itertools.product(input_list, [error_value])
results = p.map(get_data, args)
for result in results:
...
print("Total errors:", error_value.value) The first 'print' statement within the function updates and displays increasing values as the condition is met.
The script produces the correct results output however, the final 'print' statement comes up with this error:
Output: Traceback (most recent call last):
File "C:\'mylocation'\Python\Python37-32\lib\multiprocessing\managers.py", line 788, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\mylocation\Programs\Python\Python37-32\lib\multiprocessing\managers.py", line 1115, in get
return self._callmethod('get')
File "C:\mylocation\Programs\Python\Python37-32\lib\multiprocessing\managers.py", line 792, in _callmethod
self._connect()
File "C:\mylocation\Programs\Python\Python37-32\lib\multiprocessing\managers.py", line 779, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "C:\mylocation\Programs\Python\Python37-32\lib\multiprocessing\connection.py", line 490, in Client
c = PipeClient(address)
File "C:\mylocation\Programs\Python\Python37-32\lib\multiprocessing\connection.py", line 691, in PipeClient
_winapi.WaitNamedPipe(address, 1000)
FileNotFoundError: [WinError 2] The system cannot find the file specified
Has this got something to do with Windows having no fork or is it queueing/process problem maybe??
Any further help welcome.
thanks
|