2012-03-29 7 views
0

Mein MPI-Programm besteht aus einer Anzahl von Prozessen, die keine oder mehrere Nachrichten von anderen Prozessen senden/empfangen. Die Prozesse überprüfen regelmäßig, ob Nachrichten zur Verarbeitung verfügbar sind. Der Code läuft bis zu 3000 Iterationsschritten. Danach gibt es Deadlocks und Programm friert ein. Bitte zögern Sie nicht, einen Vorschlag zu werfen. Unten ist mein Pseudo-Code ... lass mich wissen ob du irgendeine Frage hast.Bedarf Vorschlag, um Deadlocks mit höheren Iterationsschritten zu vermeiden

N ist die Anzahl der Verarbeitungsknoten:

do{ 

    if(numberIterations>1) -- Receive Data 
    { 
     getdata: 
     MPI_Iprobe() 
     while(flagprobe !=0) 
     { 
      If(TAG=StausUpdate) 
       Update status of processor; 
      If(TAG=Data) 
       Process Data; 
      MPI_Iprobe() 
     } 
    } 

    if(numberIterations< MaxIterations) -- Send Data 
    { 
     for(i=0;i<N;i++) 
      MPI_Bsend_init(request[i]) 

     for(i=0;i<N;i++) 
      MPI_Start(request[i]) 

     numberIterations++; 
    } 

    if(numberIterations == MaxIterations) -- Update Processor Status 
    { 
     for(i=0;i<N;i++) 
      MPI_Isend(request1[i]) -- with TAG = StatusUpdate 

     goto getdata; 
     set endloopflag = 1 
    } 

    if(numberIterations == MaxIterations && endloopflag ==1) --Final Check 
    { 
     for(i=0;i<N;i++) 
      MPI_Test(request1[i],flagtest); 
     if(!flagtest) 
      goto getdata; 
    } 

} while(numberIterations < MaxIterations); 

for(i=0;i<N;i++) --Free request 
{ 
    MPI_Request_free(&request[i]); 
} 

--- Aktualisiert Pseudocode per Mark

#include <string.h> 
#include <math.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <time.h> 
#include <iostream> 
#include <fstream> 
#include "mpi.h" 
#define N 9      //# of nodes 
#define M 10      //samples number 
#define n 2      //demension of weight vector 
#define TAU 0.15 
#define DISTANCE 0.1    //measuremeant for two nodes 
#define A 0.2      //learning rate 
#define ITERATION_STEPS 1000  // Program goes for ITERATION_STEPS - 1 
#define SAMPLE_STEP 1    //Number of current iteration 
#define BT1 0.17 
#define BT2 0.02 
#define A0 0.9     //initial learning rate 
#define AC 0.05     //middle learning rate 
#define AF 0.001     //final learning rate 
#define TC 4500      //first period of iteration 
#define TF 5000     //second period of iteration 

#define BUFSIZE 400000 

using namespace std; 

void printtime(double comm_time,double update_time,string filename,int rank); 
int checkack(int ack[],int status[]); 
int checkstatus(int status[],int procid); 
int noof_activeproc(int status[],int myrank); 
void printresult(double w[][n],string filename,int rank); 
void plot(double w[][n], char* fileName); 
void update(double w[][n], double x[], int t,int rank, int g[][9]); 
double norm(double a[], double b[]); 
double p(double sample[], double w[], int t); 
void OneToTwo (int index, int *row, int *col); 
int g(int b, int j); 

int main(int argc, char *argv[]) 
{ 
     int rank,size;; 
     MPI_Init(&argc,&argv); 
     MPI_Comm_size(MPI_COMM_WORLD, &size); 
     MPI_Comm_rank(MPI_COMM_WORLD, &rank); 
     MPI_Status statusprobe,status,status1[N],status2[N]; 
     MPI_Request request[N],request1[N],request2[N],request3[N]; //N request for N process per iteration 
     double buf[BUFSIZE]; // buffer for the outgoing message; 
     int procstatus[N],ack[N]; //store the process status and ack 
     double temp[n]; 
     double tempsend[n]; 
     ifstream in1, in2, in3; 
     ofstream out,outtime; 
     int i, j,k,z,req = 0,req1; 
     int checklocation=0; //bookmark for MPI_Test  
     int checklocation1=0; //bookmark for MPI_Test  
     int numberIterations; 
     double samples[M][n]; //for all samples 
     double w[N][n]; //for all node weight 
     double x[n]; //one sample 
     int g[9][9]; 
     int count=n; 
     int flagprobe=0; 
     int flagtest=1; 
     int flagrecv=0; 
     int datareadflag; // flag that checks wheter the data is read or not 
     double dataincount; 
     int checktestflag; 
     int requestfreeflag=0; 
     int flag=0; //test flag 
     int flagtest1=0; // check for the request to update the processor status 
     int endloopflag=0; 
     int *bptr, bl; 

     double start_time,end_time,tupdate_start,tupdate_end,t_temp1,t_temp2; 
     double comm_time; 
     double update_time=0; 

     for(i=0;i<N;i++) 
      { 
       procstatus[i]=1; // all the processor are on 
       ack[i]=0; 
      } 

     // read sample data 
     in1.open("samples.dat"); 
     if(!in1) 
     { 
     cout<<"100:File openning error. \n"; 
     exit(100); 
     } 

     in2.open("initialMap.dat"); 
     if(!in2) 
     { 
     cout<<"200:File openning error. \n"; 
     exit(200); 
     } 

     in3.open("gij.dat"); 
     if(!in3) 
     { 
     cout<<"200:File openning error. \n"; 
     exit(200); 
     } 

     for(i=0; i<M; i++) 
      for(j=0; j<n; j++){ 
       in1>>samples[i][j]; 
       //cout<<samples[i][j]<<"="<<i<<","<<j<<" "; 
       } 
     //read initial weights 
     for(i=0; i<N; i++) 
      for(j=0; j<n; j++) { 
      in2>>w[i][j]; 
      //cout<<w[i][j]<<"="<<i<<","<<j<<" "; 
      } 
     //read Gij 
     for(i=0; i<9; i++) 
      for(j=0; j<9; j++) { 
      in3>>g[i][j]; 
      //cout<<w[i][j]<<"="<<i<<","<<j<<" "; 
      } 
     //Print W to file 
      out.open("w.dot"); 
      out<<"graph G {"<<endl; 
      out<<"size=\"10,10\";"<<endl; 
      out<<"ratio=expand;"<<endl; 
      out<<"node [shape=circle];"<<endl; 
      //out<<"node [shape=point];"<<endl; 
      for(i=0; i<9; i++) { 
       for(j=0; j<n; j++) { 
       if(j == 0) out<<i+1<<"[pos = \""; 
       out<<w[i][j]; 
       if(j == 0) out<<","; 
       if(j == 1) out<<"!\"]"<<endl; 
       } 
      } 

      for(i=0; i<9; i++) 
       for(j=0; j<i+1; j++) { 
       if(g[i][j] == 1 && i != j) out<<i+1<<" -- "<<j+1<<";"<<endl; 
       } 
      out<<"}"<<endl; 


     MPI_Barrier(MPI_COMM_WORLD);  
     MPI_Buffer_attach(buf, BUFSIZE); 
     k = 0; 
     numberIterations = 1; 
     dataincount=N; //for the first time , all process or has N data in from file. 
     datareadflag=1; 
     checktestflag=1; 
     int tagno=1; 
     int prevtag; //  start_time=MPI_Wtime(); 
     time_t start,start1,end1,end; 
     time(&start); 
     do{ 
       if(numberIterations%SAMPLE_STEP==0) 
        { 
         t_temp1=MPI_Wtime(); 
         if(k>=M) k=0; 
         for(j=0; j<n; j++) { 
         x[j]=samples[k][j]; 
        } 
         k++; 
        t_temp2=MPI_Wtime(); 
        }   

       if(numberIterations>1) 
       { getdata: 
        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flagprobe, &statusprobe); //tag = numberIterations, 
         while(flagprobe != 0) 
         { 
         if(statusprobe.MPI_TAG==0) // tag=0 means status update of the processor 
          { 
           int rtemp[1]; 
           MPI_Recv(rtemp,1,MPI_INT,statusprobe.MPI_SOURCE,0, MPI_COMM_WORLD, &status);    
           procstatus[status.MPI_SOURCE]=rtemp[0]; 
          } 
          else 
           { 
           datareadflag=1; 
           dataincount++; 
           MPI_Recv(temp,count,MPI_DOUBLE,statusprobe.MPI_SOURCE,statusprobe.MPI_TAG, MPI_COMM_WORLD, &status);    
           for(j=0;j<n;j++) w[status.MPI_SOURCE][j]=temp[j]; 
           } 
         MPI_Iprobe(MPI_ANY_SOURCE,MPI_ANY_TAG, MPI_COMM_WORLD,&flagprobe, &statusprobe); 
         } //end while 
        } //end if (no of iteration >1) 
       if(numberIterations< ITERATION_STEPS) // do not send on last iterations. 
         { 
          tupdate_start=MPI_Wtime(); 
          update(w,x,k,rank,g); 
          tupdate_end=MPI_Wtime(); 
          update_time=update_time+tupdate_end-tupdate_start;     

          if(req==0) { 
          for(i=0;i<N;i++) 
          { int c=0; 
           if((i!=rank)&&(checkstatus(procstatus,i)==1)) // send if only the process is active 
           { 
           MPI_Bsend_init(w[rank], count, MPI_DOUBLE, i ,tagno, MPI_COMM_WORLD,&request[i]); 
           MPI_Bsend_init(&c,1, MPI_INT,i,0, MPI_COMM_WORLD,&request1[i]); 
           } 
          } //end for 
          req=1; 
          } 

          for(i=0;i<N;i++) 
          { 
           if((i!=rank)&&(checkstatus(procstatus,i)==1)) // send if only the process is active 
           { 

            MPI_Start(&request[i]); 
             //actual message send. 
           } 
          } 

          tagno++; 
          requestfreeflag==1; 
          checktestflag=0; 
          dataincount=0; 
          checklocation=0; 
          numberIterations++; 
          datareadflag=0; 

          cout<<numberIterations<<"-th iterations for . "<<rank<<endl; 
         } //end if(numberIterations< ITERATION_STEPS) 

       /* Before exiting notify all the active process */ 
       if((numberIterations == ITERATION_STEPS) && (endloopflag==0)) //endloop flag prevent sending twice 
        { // status value (initially all 1); 
         req1=0; 
         for(i=0;i<N;i++) 
         { 
          if((i!=rank)&&(checkstatus(procstatus,i)==1)) // check if only the process is active 
          { 
           MPI_Start(&request1[i]); 
          } 
         } 
         endloopflag=1; 
         goto getdata; 
         } //end if 

       if(numberIterations == ITERATION_STEPS && endloopflag==1) 
       { 
        for(i=1;i<N;i++) 
         { 
         if((i!=rank)&&(checkstatus(procstatus,i)==1)) // check if only the process is active 
         { 
          MPI_Test(&request[i], &flagtest, &status); 
          MPI_Test(&request1[i], &flagtest1, &status); 
          if(!flagtest || !flagtest1) 
          { 
           checklocation1=i; //for next check continue from i; 
           cout<<"getdata called by" <<rank<<endl; 
           goto getdata; 
          }    
          } //end if 
         }//end for 
        } //end if 

     } while(numberIterations < ITERATION_STEPS); 

     for(i=0;i<N;i++) 
      { 
      if(i!=rank && request[i]!=MPI_REQUEST_NULL) 
      { 
       MPI_Request_free(&request[i]); 
       MPI_Request_free(&request1[i]); 
      } 
      } 

     if(numberIterations == ITERATION_STEPS) 
      { 
      char pno[2]; 
      sprintf(pno,"%d",rank); 
      string filename; 
      filename=filename+pno; 
      filename=filename+".dot"; 
      char *file=strdup(filename.c_str()); 
      ofstream out; 
      out.open(file); 
       //plot(w, "final_map_25.dat",rank); 
      out<<"graph G {"<<endl; 
      out<<"size=\"10,10\";"<<endl; 
      out<<"ratio=expand;"<<endl; 
      out<<"node [shape=point];"<<endl; 
      //out<<"node [shape=point];"<<endl; 
      for(i=0; i<9; i++) { 
       for(j=0; j<n; j++) { 
       if(j == 0) out<<i+1<<"[pos = \""; 
       out<<w[i][j]; 
       if(j == 0) out<<","; 
       if(j == 1) out<<"!\"]"<<endl; 
       } 
      } 

      for(i=0; i<9; i++) 
       for(j=0; j<i+1; j++) { 
       if(g[i][j] == 1 && i != j) out<<i+1<<" -- "<<j+1<<";"<<endl; 
       } 
      out<<"}"<<endl; 
      } 

     MPI_Buffer_detach(&bptr, &bl); 
     MPI_Finalize(); 
return                                         0; 
} // End main Program 

Antwort

1

Mein Wissen von MPI_Start ist ein bisschen rostig, aber es sollte nicht kombiniert werden mit MPI_Wait (oder eine Variante davon)? Ohne die Wartezeit frage ich mich, ob Ihre Puffer überlaufen, was eine Art Erklärung dafür ist, dass Ihr Programm eine Weile läuft, bevor Sie aufhören. Da Sie keine Art von Fehlermeldungen angezeigt werden, ich fühle mich frei, Ihre Aussage zu interpretieren gibt es Deadlocks und Programm friert ein, um die Situation zu decken, wo das Programm einfriert, weil der Pufferspeicherplatz erschöpft ist. Ob das genau ein Deadlock ist oder nicht, ist strittig.

+0

Ich benutze MPI_Request_free am Ende, um Prozessoren und Puffer freizugeben. Ich dachte, dass Puffer vorher Probleme verursachte, aber dann sieht die Fehlermeldung am Ende wie ein Deadlock aus. "Mpirun bemerkte, dass Prozess 1 mit PID 14656 am Knoten node1082 auf Signal 9 (Killed) beendet wurde." – Naga

+1

Ihr Pseudocode ist pseudokorrekt (wahrscheinlich), ich bin nicht sicher, dass wir ohne Realocode mehr helfen können. –

+0

Ich habe immer noch das gleiche Problem. Das Programm läuft weiter bei MPI_Test (& request [i], & Flagtest, & Status); MPI_Test (& request1 [i], & Flagtest1, & Status); gehe zu getdata; – Naga

Verwandte Themen