Posts: 44
Threads: 17
Joined: Jan 2017
May-10-2017, 07:03 PM
(This post was last modified: May-10-2017, 10:09 PM by Felipe.)
Hi guys,
I need to parallelize my code to improve it's velocity. I tried to understand how to do it reading some internet tutorials, but without success. In order to learn how it works, I wrote some basic code with my necessities, as you can see below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
import numpy as np
from concurrent.futures import ProcessPoolExecutor
def vet(n):
p = np.array([ 0. ] * n)
for i in range (n):
p[i] = i
return p
def subprocess(q,func_enter):
pool = ProcessPoolExecutor(max_workers = 2 )
results = pool. map (func_enter,q)
return results
def sum_elements(p):
out1 = np.array([ 0. ])
A = p[ 0 ]
B = p[ 1 ]
C = p[ 2 ]
D = p[ 3 ]
out1 = A + B + C + D
return out1
P = vet( 8 )
Q = P.reshape( 2 , 4 )
U = subprocess(Q,sum_elements)
print (U)
|
My desired output is an array with sum of the elements of every line of the 2D array Q. In other words, its the output of the function sum_elements like this example:
The sum of the results of one "line" of the array Q with another one, can be a good result too. Example:
Here's my output at this moment:
1 |
itertools.chain object at 0x7fc104ab4d30
|
Thanks for all the help !!
Posts: 591
Threads: 26
Joined: Sep 2016
Why do you believe you need to parallelize this?
You seem to be using numpy incorrectly/badly so you should probably address that first.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
>>> import numpy as np
>>> a = np.array([[ 1 , 2 , 3 ],[ 4 , 5 , 6 ],[ 7 , 8 , 9 ]])
>>> a
array([[ 1 , 2 , 3 ],
[ 4 , 5 , 6 ],
[ 7 , 8 , 9 ]])
>>> np. sum (a)
45
>>> np. sum (a, 0 )
array([ 12 , 15 , 18 ])
>>> np. sum (a, 1 )
array([ 6 , 15 , 24 ])
>>>
|
Posts: 44
Threads: 17
Joined: Jan 2017
(May-10-2017, 07:08 PM)Mekire Wrote: Why do you believe you need to parallelize this?
You seem to be using numpy incorrectly/badly so you should probably address that first.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
>>> import numpy as np
>>> a = np.array([[ 1 , 2 , 3 ],[ 4 , 5 , 6 ],[ 7 , 8 , 9 ]])
>>> a
array([[ 1 , 2 , 3 ],
[ 4 , 5 , 6 ],
[ 7 , 8 , 9 ]])
>>> np. sum (a)
45
>>> np. sum (a, 0 )
array([ 12 , 15 , 18 ])
>>> np. sum (a, 1 )
array([ 6 , 15 , 24 ])
>>>
|
Hi, thanks for your answer.
Let me try to be more clear. The code that I wrote above, its only a simple example to try to learn how to parallelize a situation using an 2D array. The real code that I want to parallelize is much more complex, involving minimization algorithms and another stuffs. But I can't do it, if first don't learn how to do it in a simple example, like the one that I posted in this thread.
Posts: 3,458
Threads: 101
Joined: Sep 2016
Could you edit your post, or re-post your code, so it has indentation? That way we can try a few things and time them to see which is faster for your use case.
That said, the only "true" parallelization is the multiprocessing module. Anything else might be asynchronous, but it isn't parallel (on cpython, at least).
Posts: 44
Threads: 17
Joined: Jan 2017
(May-10-2017, 09:49 PM)nilamo Wrote: Could you edit your post, or re-post your code, so it has indentation? That way we can try a few things and time them to see which is faster for your use case.
That said, the only "true" parallelization is the multiprocessing module. Anything else might be asynchronous, but it isn't parallel (on cpython, at least).
Hi, sorry, I did not realize that the indentation was lost when I pasted my code. I have already edited my first code posted.
I appreciate the help !!
Posts: 3,458
Threads: 101
Joined: Sep 2016
(May-10-2017, 07:03 PM)Felipe Wrote: itertools.chain object at 0x7fc104ab4d30
Ok, I should have been able to help without even running it. Call list() on U before printing it. You've got an itertools.chain object, instead of the actual results, because by default it's lazy-loaded, so it doesn't do more work than needed. Calling list() forces it to actually get the data. Changing the bottom of your script to look like this:
1 2 3 4 5 |
if __name__ = = "__main__" :
P = vet( 8 )
Q = P.reshape( 2 , 4 )
U = subprocess(Q,sum_elements)
print ( list (U))
|
Gives this...
Output: [6.0, 22.0]
Here's some timings, with just map/sum as a benchmark. We might need more data to show that making it parallel is worth it...
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
import numpy as np
from concurrent.futures import ProcessPoolExecutor
def vet(n):
p = np.array([ 0. ] * n)
for i in range (n):
p[i] = i
return p
def subprocess(q,func_enter):
pool = ProcessPoolExecutor(max_workers = 2 )
results = pool. map (func_enter,q)
return results
def sum_elements(p):
A = p[ 0 ]
B = p[ 1 ]
C = p[ 2 ]
D = p[ 3 ]
out1 = A + B + C + D
return out1
def setup():
P = vet( 8 )
Q = P.reshape( 2 , 4 )
return Q
def felipe():
data = setup()
U = subprocess(data, sum_elements)
return list (U)
def mapsum():
data = setup()
return list ( map ( sum , data))
if __name__ = = "__main__" :
assert (felipe() = = mapsum())
import timeit
funcs = [ "felipe" , "mapsum" ]
for func in funcs:
timed = timeit.timeit( "{0}()" . format (func), number = 100 , globals = globals ())
print ( "{0} => {1}" . format (func, timed))
|
Output: D:\Projects\playground>python temp.py
felipe => 19.259355361720637
mapsum => 0.001357432635035849
Posts: 44
Threads: 17
Joined: Jan 2017
May-11-2017, 01:44 PM
(This post was last modified: May-11-2017, 01:46 PM by Felipe.)
(May-11-2017, 05:08 AM)nilamo Wrote: (May-10-2017, 07:03 PM)Felipe Wrote: itertools.chain object at 0x7fc104ab4d30
Output: D:\Projects\playground>python temp.py
felipe => 19.259355361720637
mapsum => 0.001357432635035849
Thanks for the help. It's a big difference.
Well, I'm trying to improve the velocity of this function that is called several times by the main program:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
import numpy as np
def fwd(p):
prism, alpha, I = np.loadtxt( "modelo.dat" , delimiter = ' ' , usecols = ( 0 , 1 , 2 ), unpack = True )
prism = int (prism)
Tx = np.array([ 0. ])
Tz = np.array([ 0. ])
P = p.reshape(prism, 4 )
X = np.loadtxt( "entrada.dat" , delimiter = ' ' , usecols = [ 0 ], unpack = True )
for k in range (prism):
MDT = P.item(k, 0 )
inc = P.item(k, 1 )
x0 = P.item(k, 2 )
D = P.item(k, 3 )
cosseno = np.cos(inc * np.pi / 180. )
seno = np.sin(inc * np.pi / 180. )
Jx = 100 * cosseno
Jz = 100 * seno
wx = X - x0
wn = wx * * 2 + D * * 2
Tx = Tx - 2 * MDT * (Jx * D + Jz * wx) / wn
Tz = Tz + 2 * MDT * (Jz * D - Jx * wx) / wn
cosseno = np.cos(I * np.pi / 180. )
seno1 = np.sin(I * np.pi / 180. )
seno2 = np.sin(alpha * np.pi / 180. )
B = np.sqrt(Tx * * 2 + Tz * * 2 )
A = np.arctan(Tz / Tx)
Tt = cosseno * seno2 * Tx + seno1 * Tz
f = np.array(np.column_stack((Tt,B,A)))
np.savetxt( 'saida.dat' ,f)
return f
|
This function receives an array "p" like the exemples above but with 16 elements. The file "modelo.dat" has these numbers: 4 90 -30
The file "entrada.dat" that I use the first column has the values of X, 306 numbers with equal distances of one to another, going from 0 to 8000.
As you can see, this function works in a sequential way.
Here's what I tried to parallelize:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
import numpy as np
from multiprocessing import Pool
from functools import partial
def sum_elements(p,X):
Tx = np.array([ 0. ])
Tz = np.array([ 0. ])
MDT = p[ 0 ]
inc = p[ 1 ]
x0 = p[ 2 ]
D = p[ 3 ]
cosseno = np.cos(inc * np.pi / 180. )
seno = np.sin(inc * np.pi / 180. )
Jx = 100 * cosseno
Jz = 100 * seno
wx = X - x0
wn = wx * * 2 + D * * 2
Tx = - 2 * MDT * (Jx * D + Jz * wx) / wn
Tz = 2 * MDT * (Jz * D - Jx * wx) / wn
out = np.array(np.column_stack((Tx,Tz)))
return out
def fwd(p):
prism, alpha, I = np.loadtxt( "modelo.dat" , delimiter = ' ' , usecols = ( 0 , 1 , 2 ), unpack = True )
prism = int (prism)
Tx = np.array([ 0. ])
Tz = np.array([ 0. ])
P = p.reshape(prism, 4 )
x = np.loadtxt( "entrada.dat" , delimiter = ' ' , usecols = [ 0 ], unpack = True )
t = Pool()
saida = np.array(t. map (partial(sum_elements, X = x), P))
saida2 = saida. sum (axis = 0 )
B = np.sqrt(saida2[:, 0 ] * * 2 + saida2[:, 1 ] * * 2 )
A = np.arctan(saida2[:, 1 ] / saida2[:, 0 ])
cosseno = np.cos(I * np.pi / 180. )
seno1 = np.sin(I * np.pi / 180. )
seno2 = np.sin(alpha * np.pi / 180. )
Tt = cosseno * seno2 * saida2[:, 0 ] + seno1 * saida2[:, 1 ]
f = np.array(np.column_stack((Tt,B,A)))
np.savetxt( 'saida.dat' ,f)
t.close()
t.join()
return f
|
The first function gives me an average time 0.005, and my parallel approach gives me an average time 0.1.
So I failed. The question is, I failed because of my inexperience with parallel coding or because my problem its a case which parallel approach can't be the faster method??
Posts: 3,458
Threads: 101
Joined: Sep 2016
It depends on how much data there is. Starting processes and passing data to and from them is not free, that takes time. So if there isn't a lot of data (like, a LOT), then the simple task of starting a process will slow your program down.
On the other hand, if this was realtime data, that you needed to process constantly, then you could start looking at having two different programs... one that ran all the time, and kept a pool of worker processes, and another that fed it data as that data becomes available. Then you don't have the slowness of spawning processes, as they're already running in the pool just waiting for more data.
Posts: 44
Threads: 17
Joined: Jan 2017
(May-11-2017, 04:57 PM)nilamo Wrote: It depends on how much data there is. Starting processes and passing data to and from them is not free, that takes time. So if there isn't a lot of data (like, a LOT), then the simple task of starting a process will slow your program down.
On the other hand, if this was realtime data, that you needed to process constantly, then you could start looking at having two different programs... one that ran all the time, and kept a pool of worker processes, and another that fed it data as that data becomes available. Then you don't have the slowness of spawning processes, as they're already running in the pool just waiting for more data.
Well I have a lots of data (3.5 Gb csv file) that I divided in smaller files. It's an inversion problem, which I have a big map made for the csv file, and I selected profiles on this map, that generates me the input files of my program (the one that I trying to parallelize some functions like the previously posted).
The function that I posted it's called many times by the minimization algorithm. So, because it's an exhaustive and repetitive process, I was wondering if the parallel programing can improve the velocity of convergence and how fast I have and answer from my program. I'm migrating from MatLab, so I know that I can improve my code removing some io's from the inside of this kind of function. Thats wasn't a concerning in MatLab because I used some global variables, but I had needed to make some adjusts to convert to Python. Now I known how to use args in the minimization algorithm, which can do my functions running a bit faster.
|