Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Help with multiprocessing
#1
Hi all

I am trying to learn the concurrent.futures module to (hopefully) speed up my code with multiprocessing and/or multithreading. The sequential version of my code is:

import math
if __name__ == 'main':
    classargs = []
    p1 = lambda age: math.log(age)
    p2 = lambda ratio: (math.log(ratio))*(-0.5390)
    p3 = lambda SBP: math.log(SBP)*-(1.4032)
    p4 = lambda LVH: LVH*(-0.3362)
    p5 = lambda smoking: smoking*(-0.3899)
    p6 = lambda diabetes: diabetes

    parameter = [p1,p2,p3,p4,p5,p6]
    result = [48,4.5,127,0,1,1]  #to be read from Excel file; yet to be implemented

    for param, res in zip(parameter,result):
        outcome = (param(res))
        classargs.append(outcome)
    print(classargs)   #to pass the values in the list as instance of the class
The following works:

import concurrent.futures
import math
if __name__ == '__main__':
    classargs = []
    p1 = lambda age: math.log(age)
    p2 = lambda ratio: (math.log(ratio))*(-0.5390)
    p3 = lambda SBP: math.log(SBP)*-(1.4032)
    p4 = lambda LVH: LVH*(-0.3362)
    p5 = lambda smoking: smoking*(-0.3899)
    p6 = lambda diabetes: diabetes

    parameter = [p1,p2,p3,p4,p5,p6]
    result = [48,4.5,127,0,1,1]

    with concurrent.futures.ThreadPoolExecutor() as executor:
        for param,res in zip(parameter, result):
                outcome = executor.submit(param,res)
                classargs.append(outcome.result())

     print(classargs) #to pass the values in the list as instance of the class
When I run the following:

import concurrent.futures
import math
if __name__ == '__main__':
    classargs = []
    p1 = lambda age: math.log(age)
    p2 = lambda ratio: (math.log(ratio))*(-0.5390)
    p3 = lambda SBP: math.log(SBP)*-(1.4032)
    p4 = lambda LVH: LVH*(-0.3362)
    p5 = lambda smoking: smoking*(-0.3899)
    p6 = lambda diabetes: diabetes

    parameter = [p1,p2,p3,p4,p5,p6]
    result = [48,4.5,127,0,1,1]

    with concurrent.futures.ProcessPoolExecutor() as executor:  #changed from multithread to multiprocess
        for param,res in zip(parameter, result):
                outcome = executor.submit(param,res)
                classargs.append(outcome.result())

    print(classargs) #to pass the values in the list as instance of the class
I get this error:

Output:
File "framingham_multiprocessor.py", line 54, in <module> classargs.append(outcome.result()) File "C:\Users\test\AppData\Local\Programs\Python\Python38\lib\concurrent\futures\_base.py", line 439, in result return self.__get_result() File "C:\Users\test\AppData\Local\Programs\Python\Python38\lib\concurrent\futures\_base.py", line 388, in __get_result raise self._exception File "C:\Users\test\AppData\Local\Programs\Python\Python38\lib\multiprocessing\queues.py", line 239, in _feed obj = _ForkingPickler.dumps(obj) File "C:\Users\test\AppData\Local\Programs\Python\Python38\lib\multiprocessing\reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) _pickle.PicklingError: Can't pickle <function <lambda> at 0x000001316CEB5160>: attribute lookup <lambda> on __main__ failed
The idea of the code is to run a bunch of functions to transform values that the user inputs, and pass these transformed values to a class (not shown here). Is there a way to make the above code work with multiprocessing? (I am more likely to benefit from multithreading, or even the sequential version, in this case because the task is more likely to be IO bound, but I am trying to understand the techniques of multiprocessing).

Thanks in advance.
Reply
#2
It's because multiprocessing serializes (pickles) data when it passes them to processes, and lambda functions are not pickable. If you change your lambdas from this
p1 = lambda age: math.log(age)
to this
 def p1(age):
     return math.log(age)
it should work

I would suggest asyncio for IO tasks - https://docs.python.org/3/library/asyncio.html
Reply
#3
Thank you for that.
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020