2014-01-13 9 views
7

Wie kann ich den Rang eines Prozesses als Tag an die Funktion mpi4py.MPI.COMM_WORLD.Send() übergeben und richtig mit mpi4py.MPI.COMM_WORLD.Recv() empfangen?mpi4py Senden/Recv mit Tag

Ich beziehe mich auf das folgende Codebeispiel für sending and receiving messages between two processes using Send and Recv functions

#passRandomDraw.py 
import numpy 
from mpi4py import MPI 
comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 

randNum = numpy.zeros(1) 

if rank == 1: 
    randNum = numpy.random.random_sample(1) 
    print "Process", rank, "drew the number", randNum[0] 
    comm.Send(randNum, dest=0) 

if rank == 0: 
    print "Process", rank, "before receiving has the number", randNum[0] 
    comm.Recv(randNum, source=1) 
    print "Process", rank, "received the number", randNum[0] 

ich den Rang des Sendeprozesses als Tag übergeben werden soll, so dass der Empfangsvorgang es, falls es mehrere Absender sind zu identifizieren. Das ist, was ich tue

#passRandomDraw.py 
import numpy 
from mpi4py import MPI 
comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 

randNum = numpy.zeros(1) 
rnk = -1 # EDIT 

if rank == 1: 
    randNum = numpy.random.random_sample(1) 
    print "Process", rank, "drew the number", randNum[0] 
    comm.Send(randNum, dest=0, tag=rank) # EDIT 

if rank == 0: 
    print "Process", rank, "before receiving has the number", randNum[0] 
    print "Sender rank:", rnk 
    comm.Recv(randNum, 1, rnk) # EDIT 
    print "Process", rank, "received the number", randNum[0] 
    print "Sender rank:", rnk # EDIT 

ich den Wert von RNK erwarten 1 für den Empfangsprozess zu sein (was hat Rang = 0), aber es ist immer noch -1.

Kann mir jemand sagen, was ich hier falsch mache? Vielen Dank!

Antwort

9

Die Funktion Recv speichert eine empfangene Nachricht in einer Variablen. Sie müssen den Rang des erwarteten Absenders angeben. So wissen Sie immer, wer der Absender ist. Eine Schnittstelle zur Nachrichtenweitergabe muss niemals jemanden identifizieren, diese Information ist immer systemintern.

Wenn Sie mehrere Nachrichten vom selben Absender erwarten, können Sie diese mithilfe von Tags unterscheiden. Sie müssen diese Tags selbst liefern, es gibt keine natürliche Möglichkeit, diese zu erhalten. Beschriften Sie die Nachrichten einfach, nummerieren Sie sie.

Wenn Sie ein Tag haben, wird die Funktion Recv nur zurückgegeben, wenn eine Nachricht empfangen wurde, die eine passende Quelle und hat. Dies ist ein blockierender Funktionsaufruf.

In Ihrem Fall tag=-1 ist gleich die universelle Konstante MPI.ANY_TAG (verifizieren über print MPI.ANY_TAG) und damit die Recv werden jeden Tag übernehmen. Aber es wird in keiner Weise seine Eingangsvariable rnk überschreiben. Versuchen Sie rnk = -2 # EDIT und Sie werden sehen.

Sie Ihren Code anders schreiben können, obwohl dies nicht die zugrunde liegende Logik ändern (dh Sie als Programmierer muss der Absender immer wissen) es es nur versteckt, macht es implizit:

#passRandomDraw.py 
import numpy 
from mpi4py import MPI 
comm = MPI.COMM_WORLD 
rank = comm.Get_rank() 

randNum = numpy.zeros(1) 
rnk = -1 # EDIT 

if rank == 1: 
    randNum = numpy.random.random_sample(1) 
    print "Process", rank, "drew the number", randNum[0] 
    comm.Send(randNum, dest=0, tag=rank) # EDIT 

if rank == 0: 
    print "Process", rank, "before receiving has the number", randNum[0] 
    print "Sender rank:", rnk 
    status = MPI.Status() 
    comm.Recv(randNum, source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) # EDIT 
    rnk = status.Get_source() 
    print "Process", rank, "received the number", randNum[0] 
    print "Sender rank:", rnk # EDIT 
+0

Das ist sehr detailliert. Vielen Dank! Einige davon wurden klarer, als ich mehr in das Tutorial las. – user1953384

6

Das folgende Beispiel zeigt, wie man die Funktionen send und recv in mpi4py mit Rängen und Tags verwendet. Die gleiche Methode sollte für die Funktionen Send und Recv gelten. Ein Objekt MPI.Status wird verwendet, um die Quelle und das Tag für jede empfangene Nachricht zu erhalten. Wenn die mpi4py-Dokumente nicht ausreichen, ist es oft hilfreich, examples and tutorials written in C zu konsultieren.

from mpi4py import MPI 

def enum(*sequential, **named): 
    """Handy way to fake an enumerated type in Python 
    http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python 
    """ 
    enums = dict(zip(sequential, range(len(sequential))), **named) 
    return type('Enum',(), enums) 

# Define MPI message tags 
tags = enum('READY', 'DONE', 'EXIT', 'START') 

# Initializations and preliminaries 
comm = MPI.COMM_WORLD # get MPI communicator object 
size = comm.Get_size() # total number of processes 
rank = comm.Get_rank() # rank of this process 
name = MPI.Get_processor_name() 
status = MPI.Status() # get MPI status object 

if rank == 0: 
    # Master process executes code below 
    tasks = range(2*size) 
    task_index = 0 
    num_workers = size - 1 
    closed_workers = 0 
    print("Master starting with {} workers".format(num_workers)) 
    while closed_workers < num_workers: 
     data = comm.recv(source=MPI.ANY_SOURCE, tag=MPI.ANY_TAG, status=status) 
     source = status.Get_source() 
     tag = status.Get_tag() 
     if tag == tags.READY: 
      # Worker is ready, so send it a task 
      if task_index < len(tasks): 
       comm.send(tasks[task_index], dest=source, tag=tags.START) 
       print("Sending task {} to worker {}".format(task_index, source)) 
       task_index += 1 
      else: 
       comm.send(None, dest=source, tag=tags.EXIT) 
     elif tag == tags.DONE: 
      results = data 
      print("Got data from worker {}".format(source)) 
     elif tag == tags.EXIT: 
      print("Worker {} exited.".format(source)) 
      closed_workers += 1 

    print("Master finishing") 
else: 
    # Worker processes execute code below 
    print("I am a worker with rank {} on {}.".format(rank, name)) 
    while True: 
     comm.send(None, dest=0, tag=tags.READY) 
     task = comm.recv(source=0, tag=MPI.ANY_SOURCE, status=status) 
     tag = status.Get_tag() 

     if tag == tags.START: 
      # Do the work here 
      result = task**2 
      comm.send(result, dest=0, tag=tags.DONE) 
     elif tag == tags.EXIT: 
      break 

    comm.send(None, dest=0, tag=tags.EXIT) 
Verwandte Themen