Python Forum
Updating variables in Multi-processing
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Updating variables in Multi-processing
#1
Hi guys,

I'm struggling to get the correct code configuration to update an integer variable across multi-process processes!

My code in brief is below:

import multiprocessing
from multiprocessing import Value

error_value = Value('i',0)  #assign as integer type

def get_data(url):
    try:
        ...code doing some stuff here

    except ValueError as e:
       #if error thrown up simply count it
        error_value.acquire()
        error_value.value += 1
        error_value.release()
        pass 


if __name__ == '__main__':

# ---- Perform multi-processing 
    p = multiprocessing.Pool(6)
    results = p.map(get_data,'argslist')
    p.close()
    p.join()

    print(str(error_value.value))
So in the first instance I just want to count the number of errors thrown up in the function.

The output in the final print statement is zero but I know this not to be the case.

Any ideas please?
Reply
#2
Your code is ok.
Just replaced one line for testing:
import multiprocessing
from multiprocessing import Value
 
error_value = Value('i',0)  #assign as integer type
 
def get_data(url):
    try:
        raise ValueError
 
    except ValueError as e:
       #if error thrown up simply count it
        error_value.acquire()
        error_value.value += 1
        error_value.release()
        pass # <- not required, does nothing
 
 
if __name__ == '__main__':
 
# ---- Perform multi-processing 
    p = multiprocessing.Pool(6)
    results = p.map(get_data,'argslist')
    p.close()
    p.join()
 
    print(str(error_value.value))
Output:
8
This is right, because the str "argslist" consist of 8 elements.


With a context manager it's easier to handle the locking:
import multiprocessing
from multiprocessing import Value
 
error_value = Value('i',0)  #assign as integer type

def get_data(url):
    try:
        # all even numbers -> ZeroDivisionError
        if url % 2 == 0:
            1 / 0
    except ZeroDivisionError as e:
       with error_value.get_lock():
           # read/write is not an atomic operatoion
           # lock is required to do it right
           # the context manager makes the use of it easier
           error_value.value += 1
           print("ZeroDivisionError -->", url, "/ 0")
 
 
if __name__ == '__main__':
    with multiprocessing.Pool(6) as p:
        results = p.map(get_data, [0,1,2,3,4])
        # usually you should consume the results
        # actually get_data does not return explicit something
        # so it returns implicit for each call None
        for result in results:
            ...
        # pro tip: 3 dots is Ellipsis, a placeholder used for numpy
        #          I use it often to have a placeholder for missing code
        print("Total errors:", error_value.value)
multiprocessing.Pool can also used with a context manager.
Then the Pool is automatically closed, if leaving the block.

Output:
ZeroDivisionError --> 0 / 0 ZeroDivisionError --> 2 / 0 ZeroDivisionError --> 4 / 0 Total errors: 3
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply
#3
(Feb-09-2021, 02:34 PM)DeaD_EyE Wrote: Your code is ok.
Just replaced one line for testing:
import multiprocessing
from multiprocessing import Value
 
error_value = Value('i',0)  #assign as integer type
 
def get_data(url):
    try:
        raise ValueError
 
    except ValueError as e:
       #if error thrown up simply count it
        error_value.acquire()
        error_value.value += 1
        error_value.release()
        pass # <- not required, does nothing
 
 
if __name__ == '__main__':
 
# ---- Perform multi-processing 
    p = multiprocessing.Pool(6)
    results = p.map(get_data,'argslist')
    p.close()
    p.join()
 
    print(str(error_value.value))
Output:
8
This is right, because the str "argslist" consist of 8 elements.


With a context manager it's easier to handle the locking:
import multiprocessing
from multiprocessing import Value
 
error_value = Value('i',0)  #assign as integer type

def get_data(url):
    try:
        # all even numbers -> ZeroDivisionError
        if url % 2 == 0:
            1 / 0
    except ZeroDivisionError as e:
       with error_value.get_lock():
           # read/write is not an atomic operatoion
           # lock is required to do it right
           # the context manager makes the use of it easier
           error_value.value += 1
           print("ZeroDivisionError -->", url, "/ 0")
 
 
if __name__ == '__main__':
    with multiprocessing.Pool(6) as p:
        results = p.map(get_data, [0,1,2,3,4])
        # usually you should consume the results
        # actually get_data does not return explicit something
        # so it returns implicit for each call None
        for result in results:
            ...
        # pro tip: 3 dots is Ellipsis, a placeholder used for numpy
        #          I use it often to have a placeholder for missing code
        print("Total errors:", error_value.value)
multiprocessing.Pool can also used with a context manager.
Then the Pool is automatically closed, if leaving the block.

Output:
ZeroDivisionError --> 0 / 0 ZeroDivisionError --> 2 / 0 ZeroDivisionError --> 4 / 0 Total errors: 3


Thanks very much for your reply. Sorry for any stupid questions coming up but I am not a programmer...

I ran the code and got this:

Output:
ZeroDivisionError --> 0 / 0 ZeroDivisionError --> 2 / 0 ZeroDivisionError --> 4 / 0 Total errors: 0
No idea why the final output has not counted the errors?

If I put a print statement within the exception code I can see the error_value variable updating but then it does not carry to the final output statement.

I don't know if this has something to do with it but I am using Python 3.7.1 with IDLE IDE which I know is not good! In fact I need to run the script in the cmd console to see 'print' output because the IDLE shell does not show them during multi-processing. But nevertheless the print output in the exception still shows here but does not carry over the final error_value. Frustrating!
Reply
#4
If I try my own code with IDLE, then nothing works :-D

I run code usually from command line.
It seems that multiprocessing has some trouble with IDLE.
I tried it with Python 3.9.1 and 3.5.10. No results.

Here my tests:
andre@andre-GP70-2PE:~$ pyenv shell 3.
3.5.10  3.7.7   3.8.6   3.9.1   
andre@andre-GP70-2PE:~$ pyenv shell 3.5.10 
andre@andre-GP70-2PE:~$ python test1.py 
ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 3
andre@andre-GP70-2PE:~$ pyenv shell 3.7.7 
andre@andre-GP70-2PE:~$ python test1.py 
ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 3
andre@andre-GP70-2PE:~$ pyenv shell 3.8.6 
andre@andre-GP70-2PE:~$ python test1.py 
ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 4 / 0
ZeroDivisionError --> 2 / 0
Total errors: 3
andre@andre-GP70-2PE:~$ pyenv shell 3.9.1 
andre@andre-GP70-2PE:~$ python test1.py 
ZeroDivisionError --> 0 / 0
ZeroDivisionError --> 2 / 0
ZeroDivisionError --> 4 / 0
Total errors: 3
andre@andre-GP70-2PE:~$ 
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply
#5
If I run the code from the command line (on Windows 10, at least), I also get 0:
Output:
> python .\spam.py ZeroDivisionError --> 0 / 0 ZeroDivisionError --> 2 / 0 ZeroDivisionError --> 4 / 0 Total errors: 0
If you want to use anything inside a function that's running on a separate process, pass it to the function. Do not rely on globals at all. It's possible that each process creates it's own copy of the error_value. Multiprocessing, in addition, works differently depending on what platform you're on, which leads to different results (looks like on most platforms, it uses fork(), while on Windows it does... something different?).

But you can't just pass the Value object to the process pool, because Pool.map() doesn't pickle arguments (or at least that's what I gather from the 10 seconds I looked at the error). Which means using a multiprocessing.Manager, and creating a Value through that.

With these changes made, I'm left with the following:
import itertools
import multiprocessing
from multiprocessing import Value

# error_value = Value('i', 0)  # assign as integer type


def get_data(args):
    url, error_value = args
    try:
        # all even numbers -> ZeroDivisionError
        if url % 2 == 0:
            1 / 0
    except ZeroDivisionError as e:
     #   with error_value.get_lock():
            # read/write is not an atomic operatoion
            # lock is required to do it right
            # the context manager makes the use of it easier
        error_value.value += 1
        print("ZeroDivisionError -->", url, "/ 0")


if __name__ == '__main__':
    with multiprocessing.Pool(6) as p:
        with multiprocessing.Manager() as manager:
            error_value = manager.Value('i', 0)
            args = itertools.product([0, 1, 2, 3, 4], [error_value])
            results = p.map(get_data, args)
            # usually you should consume the results
            # actually get_data does not return explicit something
            # so it returns implicit for each call None
            for result in results:
                ...
            # pro tip: 3 dots is Ellipsis, a placeholder used for numpy
            #          I use it often to have a placeholder for missing code
            print("Total errors:", error_value.value)
Which yields the following:
Output:
> python .\spam.py ZeroDivisionError --> 2 / 0 ZeroDivisionError --> 4 / 0 ZeroDivisionError --> 0 / 0 Total errors: 3
This might be a good time to mention how *hard* parallel programming is.
Reply
#6
(Feb-10-2021, 11:12 PM)nilamo Wrote: If I run the code from the command line (on Windows 10, at least), I also get 0:
Output:
> python .\spam.py ZeroDivisionError --> 0 / 0 ZeroDivisionError --> 2 / 0 ZeroDivisionError --> 4 / 0 Total errors: 0
If you want to use anything inside a function that's running on a separate process, pass it to the function. Do not rely on globals at all. It's possible that each process creates it's own copy of the error_value. Multiprocessing, in addition, works differently depending on what platform you're on, which leads to different results (looks like on most platforms, it uses fork(), while on Windows it does... something different?).

But you can't just pass the Value object to the process pool, because Pool.map() doesn't pickle arguments (or at least that's what I gather from the 10 seconds I looked at the error). Which means using a multiprocessing.Manager, and creating a Value through that.

With these changes made, I'm left with the following:
import itertools
import multiprocessing
from multiprocessing import Value

# error_value = Value('i', 0)  # assign as integer type


def get_data(args):
    url, error_value = args
    try:
        # all even numbers -> ZeroDivisionError
        if url % 2 == 0:
            1 / 0
    except ZeroDivisionError as e:
     #   with error_value.get_lock():
            # read/write is not an atomic operatoion
            # lock is required to do it right
            # the context manager makes the use of it easier
        error_value.value += 1
        print("ZeroDivisionError -->", url, "/ 0")


if __name__ == '__main__':
    with multiprocessing.Pool(6) as p:
        with multiprocessing.Manager() as manager:
            error_value = manager.Value('i', 0)
            args = itertools.product([0, 1, 2, 3, 4], [error_value])
            results = p.map(get_data, args)
            # usually you should consume the results
            # actually get_data does not return explicit something
            # so it returns implicit for each call None
            for result in results:
                ...
            # pro tip: 3 dots is Ellipsis, a placeholder used for numpy
            #          I use it often to have a placeholder for missing code
            print("Total errors:", error_value.value)
Which yields the following:
Output:
> python .\spam.py ZeroDivisionError --> 2 / 0 ZeroDivisionError --> 4 / 0 ZeroDivisionError --> 0 / 0 Total errors: 3
This might be a good time to mention how *hard* parallel programming is.


This actually works on my system!! (which is running Windows 10) - so thank you (both) for submitting code for this.

I knew multiprocessing runs separate incidents and therefore variables need to be shared (somehow) but it's a minefield!

I was using multi-threading which is a bit less complicated but seems to throw up more processing errors when using Pandas (for example).

As I mentioned I'm not a programmer so I was interested to see the format for passing arguments to a function. So rather than use 'global variable' statements you would recommend passing them directly through the function like this?

def get_data(args):
    url, error_value, var3, var4, var5 = args
    try:
       ...


if __name__ == '__main__':
    with multiprocessing.Pool(6) as p:
        with multiprocessing.Manager() as manager:
            error_value = manager.Value('i', 0)
            args = itertools.product([0, 1, 2, 3, 4], [error_value], [var3], [var4], [var5])
I also mentioned I am using IDLE IDE which I know has limitations. I know about PyDev and PyCharm but I have to download Eclipse as well for PyDev.

What IDE would you recommend that I can download to use with my exiting installation of the actual Python software?
Reply
#7
Yes, multiprocessing/threading is hard and testing it in different environments can give different results.
Just had forgotten to use a Manager for it.

It should also work, if the multiprocessing.Value() is created on module level, but given the worker function as an argument.

Maybe you try it by yourself.
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply
#8
(Feb-11-2021, 09:00 AM)WiPi Wrote: What IDE would you recommend that I can download to use with my exiting installation of the actual Python software?

I like VS Code. But anything other than IDLE should be fine. I wish they'd remove idle from the default python installation.
Reply
#9
Windows doesn't have fork, so it starts a new process and imports the Python code. This can give you very different results, the most obvious and immediate being that any code not part of a function and not hidden by a conditional get's executed when you start the new process.
Reply
#10
Hi guys,

Just when I thought I had it cracked I've run into another 'odd' problem with the multi-processing code.

I am simply counting the number of times a condition is met - so as in the code below the variable being used is inside an 'if' statement.

import multiprocessing
from multiprocessing import Value
import itertools

def get_data(args):
    url, error_value = args
    try:
        ...
        if 'condition is met':
            error_value += 1
            print('error_value = ', error_value.value)
        ...

    except Exception as e:
        ...


if __name__ == '__main__':
    with multiprocessing.Pool(6) as p:
        with multiprocessing.Manager() as manager:
            error_value = manager.Value('i', 0)
            args = itertools.product(input_list, [error_value])
            results = p.map(get_data, args)

    for result in results:
          ...
    print("Total errors:", error_value.value)
The first 'print' statement within the function updates and displays increasing values as the condition is met.

The script produces the correct results output however, the final 'print' statement comes up with this error:

Output:
Traceback (most recent call last): File "C:\'mylocation'\Python\Python37-32\lib\multiprocessing\managers.py", line 788, in _callmethod conn = self._tls.connection AttributeError: 'ForkAwareLocal' object has no attribute 'connection' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "C:\mylocation\Programs\Python\Python37-32\lib\multiprocessing\managers.py", line 1115, in get return self._callmethod('get') File "C:\mylocation\Programs\Python\Python37-32\lib\multiprocessing\managers.py", line 792, in _callmethod self._connect() File "C:\mylocation\Programs\Python\Python37-32\lib\multiprocessing\managers.py", line 779, in _connect conn = self._Client(self._token.address, authkey=self._authkey) File "C:\mylocation\Programs\Python\Python37-32\lib\multiprocessing\connection.py", line 490, in Client c = PipeClient(address) File "C:\mylocation\Programs\Python\Python37-32\lib\multiprocessing\connection.py", line 691, in PipeClient _winapi.WaitNamedPipe(address, 1000) FileNotFoundError: [WinError 2] The system cannot find the file specified
Has this got something to do with Windows having no fork or is it queueing/process problem maybe??

Any further help welcome.

thanks
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Strategy on updating edits back to data table and object variables hammer 0 1,206 Dec-11-2021, 02:58 PM
Last Post: hammer
  Multi-processing to communicate with microcontrollers Khoily 1 2,521 Mar-01-2019, 08:57 PM
Last Post: DeaD_EyE
  Multi-processing - problem with running multiple *.py files at the same time Antonio 5 3,815 Sep-12-2018, 01:08 PM
Last Post: volcano63
  multi-processing dR_Garuby 1 2,668 Mar-24-2018, 05:59 PM
Last Post: woooee
  How does multi-processing in python increase the execution time? kadsank 0 2,330 Jan-15-2018, 01:15 PM
Last Post: kadsank
  Running Class methods in a loop and updating variables. ujjwalrathod007 3 6,392 Oct-05-2016, 07:11 PM
Last Post: nilamo

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020