2017-03-31 1 views
1

Ich versuche, ein verteiltes Programm zu tun, um einen einfachen Job zu machen (überprüft Primzahlen) und anscheinend das Programm fällt in eine blockierende Wartezeit und ich weiß nicht warum. Es ist ein Producer-Consumer-Programm und muss für jeden Producer-Call oder Consumer-Call einen Thread erstellen. Könnte mir jemand helfen? Ps: Das Problem ist intermittierend. Mein Code ist unten:C++ verteilte Programmblockierung

#include <pthread.h> 
#include <iostream> 
#include <atomic> 
#include <time.h> 
#include <mutex> 
#include <semaphore.h> 
#include <cmath> 

using namespace std; 

#define N 2 //buffer size 
#define Np 1 //number of producer threads 
#define Nc 8 //number of consumer threads 

sem_t full, empty; //semaphores declaration 
pthread_mutex_t bufferbusy, finish; //mutex declaration 
long numbers[N]; //buffer declaration 
int finished = 0; //flag declaration 
int M = 0; //counter declaration 

bool isPrime(long n){ 
    /*checks if long n is prime*/ 
    for (int i = 2; i < sqrt(n) ; i++) 
     if (n % i == 0) 
      return false; 
    return true; 
} 

void *producer(){ 
    /*waits empty semaphore > 1 then puts a number in the buffer (with exclusive access)*/ 
    while(M<10000){ 
     sem_wait(&empty); //WAIT EMPTY 
     pthread_mutex_lock(&bufferbusy); //locks bufferbusy mutex to ensure exclusive access 

     for (int i = 0 ; i < N ; i++){ //scrolls the buffer 
      if (numbers[i] == 0){ //if finds an empty buffer's position 
       numbers[i] = rand()%10000001+1; //fit this position with a random number 
       break; //leave the loop 
      }  
     } 

     pthread_mutex_unlock(&bufferbusy); //unlock bufferbusy mutex 
     sem_post(&full); //SIGNAL FULL 
    } 

    pthread_mutex_lock(&finish); //locks finish mutex to ensure exclusive access 
    finished++; //count finished threads to measure the execution time 
    pthread_mutex_unlock(&finish); //unlock finish mutex 

    pthread_exit(NULL); //finish thread 
} 

void *consumer(){ 
    /*waits full semaphore > 1 then pick up a number from the buffer (with exclusive access) to check if it's prime*/ 
    long data; //store a buffer number 

    while(M<10000){ 
     sem_wait(&full); // WAIT FULL 
     pthread_mutex_lock(&bufferbusy); //locks bufferbusy mutex to ensure exclusive access 

     for (int i = 0 ; i < N ; i++){ //scrolls the buffer 
      if (numbers[i] != 0){ //if finds a fited buffer position 
       data = numbers[i]; //save the number placed at this position 
       numbers[i] = 0; //clear the buffer position 
       M++; //increases the consume counter 
       break; //leave the loop 
      } 
     } 

     pthread_mutex_unlock(&bufferbusy); //unlock bufferbusy mutex 
     sem_post(&empty); //SIGNAL EMPTY 

     if(isPrime(data)); //checks if data number is prime 
      //cout << data << " is prime!" << endl; 
     //else 
      //cout << data << " is composite!" << endl; 
    } 

    pthread_mutex_lock(&finish); //locks finish mutex to ensure exclusive access 
    finished++; //count finished threads to measure the execution time 
    pthread_mutex_unlock(&finish); //unlock finish mutex 

    pthread_exit(NULL); //finish thread 
} 

int main (int argc, char *argv[]){ 
    /*___________________________________VARIABLES___________________________________*/ 
    srand (time(NULL)); //seed to measure the execution time 
    pthread_mutex_init(&bufferbusy,0); //init the bufferbusy mutex with 0 
    pthread_mutex_init(&finish,0); //init the finish mutex with 0 
    sem_init(&full, 0, 0); //init the semaphore full(second parameter means that it is only visible by this process) 
    sem_init(&empty, 0, N); //init the smaphore empty 
    pthread_t threads[Np+Nc]; //threads declaration 
    int rc; //handle errors on thread creating 
    int t; //for loop 
    /*_______________________________________________________________________________*/ 

    /*_____________________________FILL BUFFER WITH ZEROS____________________________*/ 
    for (int i = 0; i < N; i++) //scrolls the buffer 
     numbers[i] = 0; //fill all positions with 0 
    /*_______________________________________________________________________________*/ 

    /*________________________CREATE AND EXECUTE MULTITHREADS________________________*/ 
    clock_t tStart = clock(), tFinish; //start timer 


    for(t=0; t<Np; t++){ //for each Np 
     rc = pthread_create(&threads[t], NULL, producer, NULL); //creates a producer thread 
     if (rc){ //handle error on thread creating 
      cout << "ERROR; return code from pthread_create() is" << rc << endl; 
      exit(-1); 
     } 
    } 

    for(t=Np; t<Np+Nc; t++){ //for each Nc 
     rc = pthread_create(&threads[t], NULL, consumer, NULL); //create a consumer thread 
     if (rc){ //handle error on thread creating 
      cout << "ERROR; return code from pthread_create() is" << rc << endl; 
      exit(-1); 
     } 
    } 

    while (finished < Np+Nc); //blocking wait until all threads have finished 


    tFinish = clock(); //stop timer 

    sem_destroy(&full); //semaphore destructor 
    sem_destroy(&empty); //semaphore destructor 
    pthread_mutex_destroy(&bufferbusy); //mutex destructor 
    pthread_mutex_destroy(&finish); //mutex destructor 

    cout << "Done!" << endl; 
    cout << "Execution time:" << (double)(tFinish - tStart)/CLOCKS_PER_SEC << endl; 
    pthread_exit(NULL); //finish last thread 
    /*_______________________________________________________________________________*/ 
} 

Antwort

0

Es ist bereits arbeiten. Fügen Sie einfach sem_post() hinzu, bevor Sie jeden Thread beenden, damit andere Threads beendet werden können, um das Problem zu beheben.