Python Forum
How to parallelize an 2D array?
Thread Rating:
  • 1 Vote(s) - 4 Average
  • 1
  • 2
  • 3
  • 4
  • 5
How to parallelize an 2D array?
#1
Hi guys,

I need to parallelize my code to improve it's velocity. I tried to understand how to do it reading some internet tutorials, but without success. In order to learn how it works, I wrote some basic code with my necessities, as you can see below:  

#!/usr/bin/env python3

import numpy as np
from concurrent.futures import ProcessPoolExecutor

def vet(n):
   p = np.array([0.]*n)

   for i in range(n):
      p[i] = i
   return p

def subprocess(q,func_enter):

   pool = ProcessPoolExecutor(max_workers=2)
   results = pool.map(func_enter,q)

   return results

def sum_elements(p):

   out1 = np.array([0.])

   A = p[0]
   B = p[1]
   C = p[2]
   D = p[3]

   out1 = A + B + C + D

   return out1


P = vet(8) # creation of 1D array to simulate my real input

Q = P.reshape(2,4) # 2D array like a matrix, every line has the information about one model

U = subprocess(Q,sum_elements) # Parallel call
print(U)
My desired output is an array with sum of the elements of every line of the 2D array Q. In other words, its the output of the function sum_elements like this example:

[5   22]
The sum of the results of one "line" of the array Q with another one, can be a good result too. Example:

[27]
Here's my output at this moment:

itertools.chain object at 0x7fc104ab4d30
Thanks for all the help !!
Reply
#2
Why do you believe you need to parallelize this?

You seem to be using numpy incorrectly/badly so you should probably address that first.
>>> import numpy as np
>>> a = np.array([[1,2,3],[4,5,6],[7,8,9]])
>>> a
array([[1, 2, 3],
       [4, 5, 6],
       [7, 8, 9]])
>>> np.sum(a) # total sum of array
45
>>> np.sum(a, 0) # sums of each column
array([12, 15, 18])
>>> np.sum(a, 1) # sums of each row
array([ 6, 15, 24])
>>>
Reply
#3
(May-10-2017, 07:08 PM)Mekire Wrote: Why do you believe you need to parallelize this?

You seem to be using numpy incorrectly/badly so you should probably address that first.
>>> import numpy as np
>>> a = np.array([[1,2,3],[4,5,6],[7,8,9]])
>>> a
array([[1, 2, 3],
       [4, 5, 6],
       [7, 8, 9]])
>>> np.sum(a) # total sum of array
45
>>> np.sum(a, 0) # sums of each column
array([12, 15, 18])
>>> np.sum(a, 1) # sums of each row
array([ 6, 15, 24])
>>>
Hi, thanks for your answer.

Let me try to be more clear. The code that I wrote above, its only a simple example to try to learn how to parallelize a situation using an 2D array. The real code that I want to parallelize is much more complex, involving minimization algorithms and another stuffs. But I can't do it, if first don't learn how to do it in a simple example, like the one that I posted in this thread.
Reply
#4
Could you edit your post, or re-post your code, so it has indentation? That way we can try a few things and time them to see which is faster for your use case.

That said, the only "true" parallelization is the multiprocessing module. Anything else might be asynchronous, but it isn't parallel (on cpython, at least).
Reply
#5
(May-10-2017, 09:49 PM)nilamo Wrote: Could you edit your post, or re-post your code, so it has indentation?  That way we can try a few things and time them to see which is faster for your use case.

That said, the only "true" parallelization is the multiprocessing module.  Anything else might be asynchronous, but it isn't parallel (on cpython, at least).

Hi, sorry, I did not realize that the indentation was lost when I pasted my code. I have already edited my first code posted.

I appreciate the help !!
Reply
#6
(May-10-2017, 07:03 PM)Felipe Wrote: itertools.chain object at 0x7fc104ab4d30

Ok, I should have been able to help without even running it.  Call list() on U before printing it.  You've got an itertools.chain object, instead of the actual results, because by default it's lazy-loaded, so it doesn't do more work than needed.  Calling list() forces it to actually get the data.  Changing the bottom of your script to look like this:
if __name__ == "__main__":
   P = vet(8) # creation of 1D array to simulate my real input
   Q = P.reshape(2,4) # 2D array like a matrix, every line has the information about one model
   U = subprocess(Q,sum_elements) # Parallel call
   print(list(U))
Gives this...
Output:
[6.0, 22.0]
Here's some timings, with just map/sum as a benchmark.  We might need more data to show that making it parallel is worth it...
#!/usr/bin/env python3

import numpy as np
from concurrent.futures import ProcessPoolExecutor

def vet(n):
   p = np.array([0.]*n)
   for i in range(n):
      p[i] = i
   return p

def subprocess(q,func_enter):
   pool = ProcessPoolExecutor(max_workers=2)
   results = pool.map(func_enter,q)
   return results

def sum_elements(p):
  # out1 = np.array([0.])
   A = p[0]
   B = p[1]
   C = p[2]
   D = p[3]
   out1 = A + B + C + D
   return out1

def setup():
    P = vet(8) # creation of 1D array to simulate my real input
    Q = P.reshape(2,4) # 2D array like a matrix, every line has the information about one model
    return Q

def felipe():
    data = setup()
    U = subprocess(data, sum_elements) # Parallel call
    return list(U)

def mapsum():
    data = setup()
    return list(map(sum, data))

if __name__ == "__main__":
    assert(felipe() == mapsum())

    import timeit
    funcs = ["felipe", "mapsum"]
    for func in funcs:
        timed = timeit.timeit("{0}()".format(func), number=100, globals=globals())
        print("{0} => {1}".format(func, timed))
Output:
D:\Projects\playground>python temp.py felipe => 19.259355361720637 mapsum => 0.001357432635035849
Reply
#7
(May-11-2017, 05:08 AM)nilamo Wrote:
(May-10-2017, 07:03 PM)Felipe Wrote: itertools.chain object at 0x7fc104ab4d30

Output:
D:\Projects\playground>python temp.py felipe => 19.259355361720637 mapsum => 0.001357432635035849
Thanks for the help. It's a big difference.

Well, I'm trying to improve the velocity of this function that is called several times by the main program:

#!/usr/bin/env python3

import numpy as np

def fwd(p):

# FWD calcula o campo anômalo, sua intensidade e inclinação

  prism, alpha, I = np.loadtxt("modelo.dat", delimiter=' ', usecols=(0,1,2), unpack=True)
  prism = int(prism)

  Tx=np.array([0.]) 
  Tz=np.array([0.]) 

  P=p.reshape(prism,4) 

  X = np.loadtxt("entrada.dat", delimiter=' ', usecols=[0], unpack=True)

  for k in range(prism):

    MDT=P.item(k,0)
    inc=P.item(k,1)
    x0=P.item(k,2)
    D=P.item(k,3)

  cosseno=np.cos(inc*np.pi/180.) 
  seno=np.sin(inc*np.pi/180.) 

  Jx=100*cosseno
  Jz=100*seno
  wx=X-x0
  wn=wx**2+D**2

  Tx=Tx-2*MDT*(Jx*D+Jz*wx)/wn 
  Tz=Tz+2*MDT*(Jz*D-Jx*wx)/wn


  cosseno=np.cos(I*np.pi/180.)
  seno1=np.sin(I*np.pi/180.)
  seno2=np.sin(alpha*np.pi/180.)

  B=np.sqrt(Tx**2+Tz**2)
  A=np.arctan(Tz/Tx)

  Tt=cosseno*seno2*Tx+seno1*Tz 

  f=np.array(np.column_stack((Tt,B,A)))

  np.savetxt('saida.dat',f)

  return f
This function receives an array "p" like the exemples above but with 16 elements. The file "modelo.dat" has these numbers: 4  90  -30
The file "entrada.dat" that I use the first column has the values of X, 306 numbers with equal distances of one to another, going from 0 to 8000. 
As you can see, this function works in a sequential way.

Here's what I tried to parallelize:

#!/usr/bin/env python3

import numpy as np
from multiprocessing import Pool
from functools import partial

def sum_elements(p,X):

  Tx = np.array([0.])
  Tz = np.array([0.])

  MDT = p[0]
  inc = p[1]
  x0 = p[2]
  D = p[3]

  cosseno=np.cos(inc*np.pi/180.) 
  seno=np.sin(inc*np.pi/180.) 

  Jx=100*cosseno
  Jz=100*seno

  wx=X-x0
  wn=wx**2+D**2

  Tx = -2*MDT*(Jx*D+Jz*wx)/wn
  Tz = 2*MDT*(Jz*D-Jx*wx)/wn

  out = np.array(np.column_stack((Tx,Tz)))

  return out

def fwd(p):

# FWD calcula o campo anômalo, sua intensidade e inclinação

  prism, alpha, I = np.loadtxt("modelo.dat", delimiter=' ', usecols=(0,1,2), unpack=True)
  prism = int(prism)

  Tx=np.array([0.]) 
  Tz=np.array([0.]) 

  P=p.reshape(prism,4) 

  x = np.loadtxt("entrada.dat", delimiter=' ', usecols=[0], unpack=True)

  t = Pool()
  saida = np.array(t.map(partial(sum_elements, X=x), P))
  saida2 = saida.sum(axis=0)

  B=np.sqrt(saida2[:,0]**2+saida2[:,1]**2)
  A=np.arctan(saida2[:,1]/saida2[:,0])

  cosseno=np.cos(I*np.pi/180.)
  seno1=np.sin(I*np.pi/180.)
  seno2=np.sin(alpha*np.pi/180.)

  Tt=cosseno*seno2*saida2[:,0]+seno1*saida2[:,1]

  f=np.array(np.column_stack((Tt,B,A)))

  np.savetxt('saida.dat',f)

  t.close()
  t.join()

  return f
The first function gives me an average time 0.005, and my parallel approach gives me an average time 0.1. 
So I failed. The question is, I failed because of my inexperience with parallel coding or because my problem its a case which parallel approach can't be the faster method??
Reply
#8
It depends on how much data there is. Starting processes and passing data to and from them is not free, that takes time. So if there isn't a lot of data (like, a LOT), then the simple task of starting a process will slow your program down.

On the other hand, if this was realtime data, that you needed to process constantly, then you could start looking at having two different programs... one that ran all the time, and kept a pool of worker processes, and another that fed it data as that data becomes available. Then you don't have the slowness of spawning processes, as they're already running in the pool just waiting for more data.
Reply
#9
(May-11-2017, 04:57 PM)nilamo Wrote: It depends on how much data there is.  Starting processes and passing data to and from them is not free, that takes time.  So if there isn't a lot of data (like, a LOT), then the simple task of starting a process will slow your program down.

On the other hand, if this was realtime data, that you needed to process constantly, then you could start looking at having two different programs... one that ran all the time, and kept a pool of worker processes, and another that fed it data as that data becomes available.  Then you don't have the slowness of spawning processes, as they're already running in the pool just waiting for more data.

Well I have a lots of data (3.5 Gb csv file) that I divided in smaller files. It's an inversion problem, which I have a big map made for the csv file, and I selected profiles on this map, that generates me the input files of my program (the one that I trying to parallelize some functions like the previously posted). 

The function that I posted it's called many times by the minimization algorithm. So, because it's an exhaustive and repetitive process, I was wondering if the parallel programing can improve the velocity of convergence and how fast I have and answer from my program. I'm migrating from MatLab, so I know that I can improve my code removing some io's from the inside of this kind of function. Thats wasn't a concerning in MatLab because I used some global variables, but I had needed to make some adjusts to convert to Python. Now I known how to use args in the minimization algorithm, which can do my functions running a bit faster.
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020