Python Parallel Programing - wissam1974 - Feb-23-2019
Hi all,
this is the first time I am using the parallel program in python, my program is shown below
import numpy as np
from time import time
import multiprocessing as mp
# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])#print(arr)
data = arr.tolist()
data[:5]
# Define and function creation
def howmany_within_range(row, minimum, maximum):
"""Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
count = 0
for n in row:
if minimum <= n <= maximum:
count = count + 1
return count
# Solution 2: Paralleization
# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())
# Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]
# Step 3: Don't forget to close
pool.close()
# Step 4: Result
print(results[:10]) the execution of this program give the following error:
Error: Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 114, in _main
prepare(preparation_data)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="__mp_main__")
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 85, in _run_code
exec(code, run_globals)
File "C:\109_personel\113_ARIMA_Tmin\Test_2.py", line 24, in <module>
pool = mp.Pool(mp.cpu_count())Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 119, in Pool
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
context=self.get_context())
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 176, in __init__
exitcode = _main(fd)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 114, in _main
self._repopulate_pool()
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 241, in _repopulate_pool
prepare(preparation_data)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
w.start()
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
self._popen = self._Popen(self)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen
run_name="__mp_main__")
return Popen(process_obj) File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 263, in run_path
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 33, in __init__
prep_data = spawn.get_preparation_data(process_obj._name)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 143, in get_preparation_data
pkg_name=pkg_name, script_name=fname)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 96, in _run_module_code
_check_not_importing_main()
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 136, in _check_not_importing_main
Traceback (most recent call last):
File "<string>", line 1, in <module>
mod_name, mod_spec, pkg_name, script_name)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 85, in _run_code
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
exec(code, run_globals)
any one can help me to solve this problem will be appreciated.
RE: Python Parallel Programing - stullis - Feb-24-2019
The problem seems to be that your list comprehension is sending a new task to the Pool before the workers in the Pool have completed their tasks. If you have four workers in the pool, you can issue four tasks at a time. Then, you must wait for one of them to finish before issuing subsequent tasks. Essentially, the loop of the comprehension is too fast and isn't waiting.
To fix this, I would refactor to do the following:
- Divide the DataFrame into x parts where x is the number of processors available.
- Create a new function to pass into Pool.apply(). This function needs to take one of the DataFrame sections from #1 and then iterate over it calling howmany_within_range() for each row.
- After calling Pool.close(), call Pool.join() to halt the main thread while the Pool workers do their work.
That should at least get you closer to a working result.
RE: Python Parallel Programing - wissam1974 - Feb-24-2019
Hi Mr,
first i would like to express all my appreciation for your help.
i have followed your suggestions step by step and i modify my code so as it fit your recommendations, the new code becomes as follow:
import numpy as np
from time import time
import multiprocessing as mp
# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])#print(arr)
data = arr.tolist()
data[:5]
data1=[data[i] for i in range(0,1000)]
data2=[data[i] for i in range(1001,2000)]
data3=[data[i] for i in range(2001,3000)]
data4=[data[i] for i in range(3001,4000)]
# Define and function creation
def howmany_within_range(row, minimum, maximum):
"""Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
count = 0
for n in row:
if minimum <= n <= maximum:
count = count + 1
return count
# Define funcThread
def funcThread(minimum, maximum):
howmany_within_range(data1, minimum, maximum)
howmany_within_range(data2, minimum, maximum)
howmany_within_range(data3, minimum, maximum)
howmany_within_range(data4, minimum, maximum)
# Block: Paralleization
# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())
# Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(funcThread, args=(4, 8))]
# Step 3: Don't forget to close
pool.close()
pool.join()
# Step 4: Result
print(results[:10]) the execution of my program gives the same error:
Error: Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
exitcode = _main(fd)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 114, in _main
prepare(preparation_data)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
run_name="__mp_main__")
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 263, in run_path
pkg_name=pkg_name, script_name=fname)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 96, in _run_module_code
mod_name, mod_spec, pkg_name, script_name)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 85, in _run_code
exec(code, run_globals)
File "C:\109_personel\113_ARIMA_Tmin\Test_3.py", line 33, in <module>
pool = mp.Pool(mp.cpu_count())
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 119, in Pool
context=self.get_context())
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 176, in __init__
Traceback (most recent call last):
File "<string>", line 1, in <module>
self._repopulate_pool()
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\pool.py", line 241, in _repopulate_pool
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 105, in spawn_main
w.start()
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\process.py", line 112, in start
exitcode = _main(fd)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 114, in _main
self._popen = self._Popen(self)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\context.py", line 322, in _Popen
prepare(preparation_data)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 225, in prepare
return Popen(process_obj)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\popen_spawn_win32.py", line 33, in __init__
prep_data = spawn.get_preparation_data(process_obj._name)
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 143, in get_preparation_data
_fixup_main_from_path(data['init_main_from_path'])
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 277, in _fixup_main_from_path
_check_not_importing_main()
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\multiprocessing\spawn.py", line 136, in _check_not_importing_main
run_name="__mp_main__")
File "C:\Users\lenovo\AppData\Local\Programs\Python\Python37\lib\runpy.py", line 263, in run_path
is not going to be frozen to produce an executable.''')
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
pkg_name=pkg_name, script_name=fname)
is there other modifications or do i make something else in my code in order to solve my problem
thank you a lot Mr and your help is highly appreciated.
RE: Python Parallel Programing - stullis - Feb-24-2019
Some more research turned up a few interesting tidbits. First, the a logic test is required on Windows machines when using multiprocessing. The documentation uses the test repeatedly but does not state that it's required.
Second, multiprocessing.cpu_count() returns the total number of CPUs which may be higher than the number of usable CPUs. According to the documentation, len(os.sched_getaffintity(0)) will return the number of usable CPUs. I researched this because the traceback highlights only one line of code in your script: mp.Pool(mp.cpu_count()).
The new function isn't quite what I had in mind; perhaps I didn't explain clearly. It's refactored to what I had in mind. I sectioned out your data using a list comprehension which *should* work properly, though it may need a little tweaking with the ranges.
In any case, give this script a try and cross your fingers:
import os
from time import time
import multiprocessing as mp
import numpy as np
# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])#print(arr)
data = arr.tolist()
data[:5]
sectioned = [data[x:y] for x, y in zip(range(0, 3001, 1000),range(1000, 4001, 1000))]
# Define and function creation
def howmany_within_range(row, minimum, maximum):
"""Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
count = 0
for n in row:
if minimum <= n <= maximum:
count = count + 1
return count
# Define funcThread
def funcThread(data_section, minimum, maximum):
for row in data_section:
howmany_within_range(row, minimum, maximum)
if __name__ == "__main__": # Required logic expression
# Block: Paralleization
# Step 1: Init multiprocessing.Pool()
n = len(os.sched_getaffinity(0))
pool = mp.Pool(n)
# Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(funcThread, args=(sec, 4, 8)) for sec in sectioned]
# Step 3: Don't forget to close
pool.close()
pool.join()
# Step 4: Result
print(results[:10])
RE: Python Parallel Programing - wissam1974 - Feb-24-2019
Hi Mr,
thank you again for you reply really i am grateful to you, i also tries this code i am getting the following error
Error: C:\Users\lenovo\AppData\Local\Programs\Python\Python37\python.exe C:/109_personel/113_ARIMA_Tmin/Test_6_PyForum_CompleteParalExec.py
Traceback (most recent call last):
File "C:/109_personel/113_ARIMA_Tmin/Test_6_PyForum_CompleteParalExec.py", line 36, in <module>
n = len(os.sched_getaffinity(0))
AttributeError: module 'os' has no attribute 'sched_getaffinity'
RE: Python Parallel Programing - stullis - Feb-24-2019
Lovely. It's only available on certain UNIX systems. Hmm... Let's try hard coding it then:
from time import time
import multiprocessing as mp
import numpy as np
# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])#print(arr)
data = arr.tolist()
data[:5]
sectioned = [data[x:y] for x, y in zip(range(0, 3001, 1000),range(1000, 4001, 1000))]
# Define and function creation
def howmany_within_range(row, minimum, maximum):
"""Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
count = 0
for n in row:
if minimum <= n <= maximum:
count = count + 1
return count
# Define funcThread
def funcThread(data_section, minimum, maximum):
for row in data_section:
howmany_within_range(row, minimum, maximum)
if __name__ == "__main__": # Required logic expression
# Block: Paralleization
# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(4)
# Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(funcThread, args=(sec, 4, 8)) for sec in sectioned]
# Step 3: Don't forget to close
pool.close()
pool.join()
# Step 4: Result
print(results[:10])
RE: Python Parallel Programing - wissam1974 - Feb-25-2019
Thank you Mr,
you have helped me solve my problem.
|