2016-08-11 2 views
1

Ich bin neu in mpi4py. Ich schrieb den Code, um ein großes numpy Array data von mehreren Prozessoren zu verarbeiten. Da ich die Eingabedatei nicht angeben kann, erwähne ich die Form data. Shape von data ist [3000000,15] und es enthält Zeichenfolge Datentyp.mpi4py Gatherv gegenüber KeyError: '0'

from mpi4py import MPI 
import numpy as np 
import datetime as dt 
import math as math 


comm = MPI.COMM_WORLD 
numprocs = comm.size 
rank = comm.Get_rank() 
fname = "6.binetflow" 
data = np.loadtxt(open(fname,"rb"), dtype=object, delimiter=",", skiprows=1) 
X = data[:,[0,1,3,14,6,6,6,6,6,6,6,6]] 
num_rows = math.ceil(len(X)/float(numprocs)) 
X = X.flatten() 
sendCounts = list() 
displacements = list() 
for p in range(numprocs): 
    if p == (numprocs-1): #for last processor 
     sendCounts.append(int(len(X) - (p*num_rows*12))) 
     displacements.append(int(p*num_rows*12)) 
     break 
    sendCounts.append(int(num_rows*12)) 
    displacements.append(int(p*sendCounts[p])) 
sendbuf = np.array(X[displacements[rank]: (displacements[rank]+sendCounts[rank])]) 

## Each processor will do some task on sendbuf 

if rank == 0: 
    recvbuf = np.empty(sum(sendCounts), dtype=object) 
else: 
    recvbuf = None 

print("sendbuf: ",sendbuf) 
comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendCounts), root=0) 
if rank == 0: 
    print("Gathered array: {}".format(recvbuf)) 

Aber ich bin unten Fehler auftreten:

Traceback (most recent call last): 
    File "hello.py", line 36, in <module> 
    comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendCounts), root=0) 
    File "MPI/Comm.pyx", line 602, in mpi4py.MPI.Comm.Gatherv (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:97993) 
    File "MPI/msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:34678) 
    File "MPI/msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:33938) 
    File "MPI/msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:30349) 
    File "MPI/msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:29448) 
KeyError: 'O' 
Traceback (most recent call last): 
    File "hello.py", line 36, in <module> 
    comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendCounts), root=0) 
    File "MPI/Comm.pyx", line 602, in mpi4py.MPI.Comm.Gatherv (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:97993) 
    File "MPI/msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:34678) 
    File "MPI/msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:33938) 
    File "MPI/msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:30349) 
    File "MPI/msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:29448) 
KeyError: 'O' 
Traceback (most recent call last): 
    File "hello.py", line 36, in <module> 
    comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendCounts), root=0) 
    File "MPI/Comm.pyx", line 602, in mpi4py.MPI.Comm.Gatherv (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:97993) 
    File "MPI/msgbuffer.pxi", line 525, in mpi4py.MPI._p_msg_cco.for_gather (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:34678) 
    File "MPI/msgbuffer.pxi", line 446, in mpi4py.MPI._p_msg_cco.for_cco_send (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:33938) 
    File "MPI/msgbuffer.pxi", line 148, in mpi4py.MPI.message_simple (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:30349) 
    File "MPI/msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:29448) 
KeyError: 'O' 
Traceback (most recent call last): 
    File "hello.py", line 36, in <module> 
    comm.Gatherv(sendbuf=sendbuf, recvbuf=(recvbuf, sendCounts), root=0) 
    File "MPI/Comm.pyx", line 602, in mpi4py.MPI.Comm.Gatherv (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:97993) 
    File "MPI/msgbuffer.pxi", line 516, in mpi4py.MPI._p_msg_cco.for_gather (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:34587) 
    File "MPI/msgbuffer.pxi", line 466, in mpi4py.MPI._p_msg_cco.for_cco_recv (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:34097) 
    File "MPI/msgbuffer.pxi", line 261, in mpi4py.MPI.message_vector (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:31977) 
    File "MPI/msgbuffer.pxi", line 93, in mpi4py.MPI.message_basic (d:\build\mpi4py\mpi4py-2.0.0\src\mpi4py.MPI.c:29448) 
KeyError: 'O' 

Jede Hilfe wird sehr geschätzt. Ich stecke seit langem in diesem Problem.

Dank

Antwort

0

Das Problem ist dtype=object.

Mpi4py bietet zwei Arten von Kommunikationsfunktionen, deren Namen mit einem Großbuchstaben beginnen, z. Scatter, und diejenigen, deren Namen mit einem Kleinbuchstaben beginnen, z. scatter. From the Mpi4py documentation:

In MPI for Python, the Bcast(), Scatter(), Gather(), Allgather() and Alltoall() methods of Comm instances provide support for collective communications of memory buffers. The variants bcast(), scatter(), gather(), allgather() and alltoall() can communicate generic Python objects.

Unklar hieraus ist, dass, obwohl numpy Arrays angeblich Speicherpuffer belichten, die Puffer zu sein brauchen, um scheinbar von einer kleinen Gruppe von primitiven Datentypen, und die Arbeit sicher nicht mit Generika Objekte. Vergleichen Sie die beiden folgenden Teile des Codes:

from mpi4py import MPI 
import numpy 

Comm = MPI.COMM_WORLD 
Size = Comm.Get_size() 
Rank = Comm.Get_rank() 

if Rank == 0: 
    Data = numpy.empty(Size, dtype=object) 
else: 
    Data = None 

Data = Comm.scatter(Data, 0) # I work fine! 

print("Data on rank %d: " % Rank, Data) 

und

from mpi4py import MPI 
import numpy 

Comm = MPI.COMM_WORLD 
Size = Comm.Get_size() 
Rank = Comm.Get_rank() 

if Rank == 0: 
    Data = numpy.empty(Size, dtype=object) 
else: 
    Data = None 

Datb = numpy.empty(1, dtype=object) 

Comm.Scatter(Data, Datb, 0) # I throw KeyError! 

print("Datb on rank %d: " % Rank, Datb) 

Leider Mpi4py kein scatterv zur Verfügung stellt. Von der gleichen Stelle in der Dokumentation:

The vector variants (which can communicate different amounts of data to each process) Scatterv(), Gatherv(), Allgatherv() and Alltoallv() are also supported, they can only communicate objects exposing memory buffers.

Diese sind nicht Ausnahmen von der Groß- vs Klein Regel für dtypes, entweder:

from mpi4py import MPI 
import numpy 

Comm = MPI.COMM_WORLD 
Size = Comm.Get_size() 
Rank = Comm.Get_rank() 

if Rank == 0: 
    Data = numpy.empty(2*Size+1, dtype=numpy.dtype('float64')) 
else: 
    Data = None 

if Rank == 0: 
    Datb = numpy.empty(3, dtype=numpy.dtype('float64')) 
else: 
    Datb = numpy.empty(2, dtype=numpy.dtype('float64')) 

Comm.Scatterv(Data, Datb, 0) # I work fine! 

print("Datb on rank %d: " % Rank, Datb) 

gegen

from mpi4py import MPI 
import numpy 

Comm = MPI.COMM_WORLD 
Size = Comm.Get_size() 
Rank = Comm.Get_rank() 

if Rank == 0: 
    Data = numpy.empty(2*Size+1, dtype=object) 
else: 
    Data = None 

if Rank == 0: 
    Datb = numpy.empty(3, dtype=object) 
else: 
    Datb = numpy.empty(2, dtype=object) 

Comm.Scatterv(Data, Datb, 0) # I throw KeyError! 

print("Datb on rank %d: " % Rank, Datb) 

Sie‘ ll muss leider deinen Code schreiben, so dass er scatter verwenden kann, was die gleiche SendCount für jeden Prozess oder primitivere Punkt-zu-Punkt-Kommunikationsfunktionen erfordert, oder einige pa verwenden Parallele Einrichtung außer Mpi4py.

Verwenden von Mpi4py 2.0.0, der aktuellen stabilen Version zum Zeitpunkt dieses Artikels.