Python Forum
Python Parallel Programing
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Python Parallel Programing
#1
Hi all,
this is the first time I am using the parallel program in python, my program is shown below


import numpy as np
from time import time
import multiprocessing as mp


                        # Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])#print(arr)
data = arr.tolist()
data[:5]


                        # Define and function creation
def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

                            # Solution 2:  Paralleization
        # Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())
        # Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]
        # Step 3: Don't forget to close
pool.close()
        # Step 4: Result
print(results[:10])
the execution of this program give the following error:
Error:
Traceback (most recent call last): File "<string>", line 1, in <module> File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main exitcode = _main(fd) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 114, in _main prepare(preparation_data) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 225, in prepare _fixup_main_from_path(data['init_main_from_path']) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path run_name="__mp_main__") File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 263, in run_path pkg_name=pkg_name, script_name=fname) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 96, in _run_module_code mod_name, mod_spec, pkg_name, script_name) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 85, in _run_code exec(code, run_globals) File "C:\109_personel\113_ARIMA_Tmin\Test_2.py", line 24, in <module> pool = mp.Pool(mp.cpu_count())Traceback (most recent call last): File "<string>", line 1, in <module> File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 119, in Pool File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main context=self.get_context()) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 176, in __init__ exitcode = _main(fd) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 114, in _main self._repopulate_pool() File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 241, in _repopulate_pool prepare(preparation_data) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 225, in prepare w.start() File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start _fixup_main_from_path(data['init_main_from_path']) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path self._popen = self._Popen(self) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen run_name="__mp_main__") return Popen(process_obj) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 263, in run_path File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 33, in __init__ prep_data = spawn.get_preparation_data(process_obj._name) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 143, in get_preparation_data pkg_name=pkg_name, script_name=fname) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 96, in _run_module_code _check_not_importing_main() File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 136, in _check_not_importing_main Traceback (most recent call last): File "<string>", line 1, in <module> mod_name, mod_spec, pkg_name, script_name) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 85, in _run_code is not going to be frozen to produce an executable.''') RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() ... The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable. exec(code, run_globals)
any one can help me to solve this problem will be appreciated.
Reply
#2
The problem seems to be that your list comprehension is sending a new task to the Pool before the workers in the Pool have completed their tasks. If you have four workers in the pool, you can issue four tasks at a time. Then, you must wait for one of them to finish before issuing subsequent tasks. Essentially, the loop of the comprehension is too fast and isn't waiting.

To fix this, I would refactor to do the following:
  1. Divide the DataFrame into x parts where x is the number of processors available.
  2. Create a new function to pass into Pool.apply(). This function needs to take one of the DataFrame sections from #1 and then iterate over it calling howmany_within_range() for each row.
  3. After calling Pool.close(), call Pool.join() to halt the main thread while the Pool workers do their work.
That should at least get you closer to a working result.
Reply
#3
Hi Mr,
first i would like to express all my appreciation for your help.
i have followed your suggestions step by step and i modify my code so as it fit your recommendations, the new code becomes as follow:
import numpy as np
from time import time
import multiprocessing as mp

                        # Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])#print(arr)
data = arr.tolist()
data[:5]
data1=[data[i] for i in range(0,1000)]
data2=[data[i] for i in range(1001,2000)]
data3=[data[i] for i in range(2001,3000)]
data4=[data[i] for i in range(3001,4000)]

                        # Define and function creation
def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

                        # Define funcThread
def funcThread(minimum, maximum):
    howmany_within_range(data1, minimum, maximum)
    howmany_within_range(data2, minimum, maximum)
    howmany_within_range(data3, minimum, maximum)
    howmany_within_range(data4, minimum, maximum)

                            # Block: Paralleization
        # Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())
        # Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(funcThread, args=(4, 8))]
        # Step 3: Don't forget to close
pool.close()
pool.join()
        # Step 4: Result
print(results[:10])
the execution of my program gives the same error:

Error:
Traceback (most recent call last): File "<string>", line 1, in <module> File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main exitcode = _main(fd) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 114, in _main prepare(preparation_data) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 225, in prepare _fixup_main_from_path(data['init_main_from_path']) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path run_name="__mp_main__") File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 263, in run_path pkg_name=pkg_name, script_name=fname) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 96, in _run_module_code mod_name, mod_spec, pkg_name, script_name) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 85, in _run_code exec(code, run_globals) File "C:\109_personel\113_ARIMA_Tmin\Test_3.py", line 33, in <module> pool = mp.Pool(mp.cpu_count()) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 119, in Pool context=self.get_context()) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 176, in __init__ Traceback (most recent call last): File "<string>", line 1, in <module> self._repopulate_pool() File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 241, in _repopulate_pool File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main w.start() File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start exitcode = _main(fd) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 114, in _main self._popen = self._Popen(self) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen prepare(preparation_data) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 225, in prepare return Popen(process_obj) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 33, in __init__ prep_data = spawn.get_preparation_data(process_obj._name) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 143, in get_preparation_data _fixup_main_from_path(data['init_main_from_path']) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path _check_not_importing_main() File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 136, in _check_not_importing_main run_name="__mp_main__") File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 263, in run_path is not going to be frozen to produce an executable.''') RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() ... The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable. pkg_name=pkg_name, script_name=fname)
is there other modifications or do i make something else in my code in order to solve my problem
thank you a lot Mr and your help is highly appreciated.
Reply
#4
Some more research turned up a few interesting tidbits. First, the a logic test is required on Windows machines when using multiprocessing. The documentation uses the test repeatedly but does not state that it's required.

Second, multiprocessing.cpu_count() returns the total number of CPUs which may be higher than the number of usable CPUs. According to the documentation, len(os.sched_getaffintity(0)) will return the number of usable CPUs. I researched this because the traceback highlights only one line of code in your script: mp.Pool(mp.cpu_count()).

The new function isn't quite what I had in mind; perhaps I didn't explain clearly. It's refactored to what I had in mind. I sectioned out your data using a list comprehension which *should* work properly, though it may need a little tweaking with the ranges.

In any case, give this script a try and cross your fingers:

import os
from time import time
import multiprocessing as mp

import numpy as np
 
                        # Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])#print(arr)
data = arr.tolist()
data[:5]

sectioned = [data[x:y] for x, y in zip(range(0, 3001, 1000),range(1000, 4001, 1000))]

                        # Define and function creation
def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count
 
                        # Define funcThread
def funcThread(data_section, minimum, maximum):
    for row in data_section:
        howmany_within_range(row, minimum, maximum)

 if __name__ == "__main__": # Required logic expression
                                # Block: Paralleization
            # Step 1: Init multiprocessing.Pool()
    n = len(os.sched_getaffinity(0))
    pool = mp.Pool(n)
            # Step 2: `pool.apply` the `howmany_within_range()`
    results = [pool.apply(funcThread, args=(sec, 4, 8)) for sec in sectioned]
            # Step 3: Don't forget to close
    pool.close()
    pool.join()
            # Step 4: Result
    print(results[:10])
Reply
#5
Hi Mr,
thank you again for you reply really i am grateful to you, i also tries this code i am getting the following error

Error:
C:\Users\lenovo\AppData\Local\Programs\Python\Python37\python.exe C:/109_personel/113_ARIMA_Tmin/Test_6_PyForum_CompleteParalExec.py Traceback (most recent call last): File "C:/109_personel/113_ARIMA_Tmin/Test_6_PyForum_CompleteParalExec.py", line 36, in <module> n = len(os.sched_getaffinity(0)) AttributeError: module 'os' has no attribute 'sched_getaffinity'
Reply
#6
Lovely. It's only available on certain UNIX systems. Hmm... Let's try hard coding it then:

from time import time
import multiprocessing as mp
 
import numpy as np
  
                        # Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])#print(arr)
data = arr.tolist()
data[:5]
 
sectioned = [data[x:y] for x, y in zip(range(0, 3001, 1000),range(1000, 4001, 1000))]
 
                        # Define and function creation
def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count
  
                        # Define funcThread
def funcThread(data_section, minimum, maximum):
    for row in data_section:
        howmany_within_range(row, minimum, maximum)
 
 if __name__ == "__main__": # Required logic expression
                                # Block: Paralleization
            # Step 1: Init multiprocessing.Pool()
    pool = mp.Pool(4)
            # Step 2: `pool.apply` the `howmany_within_range()`
    results = [pool.apply(funcThread, args=(sec, 4, 8)) for sec in sectioned]
            # Step 3: Don't forget to close
    pool.close()
    pool.join()
            # Step 4: Result
    print(results[:10])
Reply
#7
Thank you Mr,
you have helped me solve my problem.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  How to run existing python script parallel using multiprocessing lravikumarvsp 3 4,713 May-24-2018, 05:23 AM
Last Post: lravikumarvsp

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020