2017-06-04 2 views
0

Ich schreibe ein einfaches Programm, das URLs aus einer Textdatei liest, und überprüft die Gültigkeit von ihnen mit Multi-Thread-Programmierung. Ich benutzte mutexes und Bedingungsvariablen auf meine Threads zu synchronisieren, aber meine App noch abstürzt und nach einigen Debugsitzungen habe ich beschlossen, etwas Hilfe zu bekommen :)Multi-Threaded Queue unter Linux C

Die Eingabe der Datei ist eine Textdatei mit URLs wie: http://www.youtube.com/ http://www.facebook.com/

und die Ausgabe sollte auf jeder uRL das aggregierte Ergebnis der curl Anfrage (ob es in Ordnung, UNKOWN oder Fehler zurück)

Dies ist mein Code:

/* 
* ex3.c 
*/ 

#define _GNU_SOURCE 

#include <curl/curl.h> 
#include <errno.h> 
#include <pthread.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <sys/types.h> 
#include <sys/wait.h> 
#include <unistd.h> 


#define REQUEST_TIMEOUT_SECONDS 2L 

#define URL_OK 0 
#define URL_ERROR 1 
#define URL_UNKNOWN 2 

#define QUEUE_SIZE 32 

#define handle_error_en(en, msg) \ 
     do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0) 

typedef struct { 
    int ok, error, unknown; 
} UrlStatus; 


typedef struct { 
    void **array; 
    int size; 
    int capacity; 
    int head; 
    int tail; 
    pthread_mutex_t mutex; 
    pthread_cond_t cv_empty; /* get notified when the queue is not full */ 
    pthread_cond_t cv_full; /* get notified when the queue is not empty */ 
} Queue; 

void queue_init(Queue *queue, int capacity) { 
    /* 
    * Initializes the queue with the specified capacity. 
    * This function should allocate the internal array, initialize its properties 
    * and also initialize its mutex and condition variables. 
    */ 
    queue->array = (void**)malloc(sizeof(void*) * capacity); 
    if (queue->array == NULL) { 
     perror("unable to allocate memory"); 
     exit(EXIT_FAILURE); 
    } 
    queue->capacity = capacity; 
    queue->size = 0; 
    queue->head = 0; 
    queue->tail = 0; 
    pthread_mutex_init(&(queue->mutex), NULL); 
    pthread_cond_init(&(queue->cv_empty), NULL); 
    pthread_cond_init(&(queue->cv_full), NULL); 
} 

void enqueue(Queue *queue, void *data) { 
    /* 
    * Enqueue an object to the queue. 
    * 
    * TODO: 
    * 1. This function should be synchronized on the queue's mutex 
    * 2. If the queue is full, it should wait until it is not full 
    *  (i.e. cv_empty) 
    * 3. Add an element to the tail of the queue, and update the tail & size 
    *  parameters 
    * 4. Signal that the queue is not empty (i.e. cv_full) 
    */ 

    pthread_mutex_lock(&(queue->mutex)); 
    while (queue->size >= QUEUE_SIZE) { 
     pthread_cond_wait(&(queue->cv_empty), &(queue->mutex)); 
    } 

    if(queue->size == 0) { 
     queue->head = 0; 
    } 

    queue->array[queue->tail] = data; 
    queue->size++; 

    queue->tail++; 
    pthread_cond_signal(&(queue->cv_full)); 
    pthread_mutex_unlock(&(queue->mutex)); 

} 

void *dequeue(Queue *queue) { 
    /* 
    * Dequeue an object from the queue. 
    * 
    * TODO: 
    * 1. This function should be synchronized on the queue's mutex 
    * 2. If the queue is empty, it should wait until it is not empty (i.e. cv_full) 
    * 3. Read the head element, and update the head & size parameters 
    * 4. Signal that the queue is not full (i.e. cv_empty) 
    * 5. Return the dequeued item 
    */ 
    void *data; 

    pthread_mutex_lock(&(queue->mutex)); 
    while (queue->size <= 0) { 
     pthread_cond_wait(&(queue->cv_full), &(queue->mutex)); 
    } 

    queue->head++; 
    data = queue->array[queue->head]; 
    queue->size--; 


    pthread_cond_signal(&(queue->cv_empty)); 
    pthread_mutex_unlock(&(queue->mutex)); 


    return data; 
} 

void queue_destroy(Queue *queue) { 
    /* 
    * Free the queue memory and destroy the mutex and the condition variables. 
    */ 
    int ret; 

    free(queue->array); 

    ret = pthread_mutex_destroy(&(queue->mutex)); 
    if (ret != 0) { 
     handle_error_en(ret, "unable to destroy mutex"); 
    } 
    ret = pthread_cond_destroy(&(queue->cv_empty)); 
    if (ret != 0) { 
     handle_error_en(ret, "unable to destroy cv_empty condition variable"); 
    } 
    ret = pthread_cond_destroy(&(queue->cv_full)); 
    if (ret != 0) { 
     handle_error_en(ret, "unable to destroy cv_full condition variable"); 
    } 
} 

void usage() { 
    fprintf(stderr, "usage:\n\t./ex3 FILENAME NUMBER_OF_THREADS\n"); 
    exit(EXIT_FAILURE); 
} 

int count = 0; 
int check_url(const char *url) { 
    CURL *curl; 
    CURLcode res; 
    long response_code = 0L; 
    int http_status = URL_UNKNOWN; 

    curl = curl_easy_init(); 

    if(curl) { 
     curl_easy_setopt(curl, CURLOPT_URL, url); 
     curl_easy_setopt(curl, CURLOPT_TIMEOUT, REQUEST_TIMEOUT_SECONDS); 
     curl_easy_setopt(curl, CURLOPT_NOBODY, 1L); /* do a HEAD request */ 

     res = curl_easy_perform(curl); 
     if(res == CURLE_OK) { 
      res = curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &response_code); 

      if (res == CURLE_OK && 
        response_code >= 200 && 
        response_code < 400) { 
       http_status = URL_OK; 
      } else { 
       http_status = URL_ERROR; 
      } 
     } 
     curl_easy_cleanup(curl); 
    } 


    return http_status; 
} 

typedef struct { 
    Queue *url_queue; 
    Queue *result_queue; 
} WorkerArguments; 

void *worker(void *args) { 
    /* 
    * TODO: 
    * 1. Initialize a UrlStatus (allocate memory using malloc, and zero it 
    *  using memset) 
    * 2. Dequeue URL from url_queue, run check_url on it, and update results 
    * (don't forget to free() the url) 
    * 3. After dequeuing a NULL value: 
    *  Enqueue results to result_queue and return 
    */ 

    WorkerArguments *worker_args = (WorkerArguments *)args; 
    UrlStatus *results = NULL; 
    char *url; 

    results = malloc(sizeof(UrlStatus)); 
    if(results == NULL) { 
     perror("Could not allocate memory"); 
     exit(-1); 
    } 

    memset(results, 0, sizeof(UrlStatus)); 

    while((url = (char *)dequeue(worker_args->url_queue)) != NULL) {  
     switch(check_url(url)) { 
      case URL_OK: 
       results->ok++; 
      break; 
      case URL_UNKNOWN: 
       results->unknown++; 
      break; 
      case URL_ERROR: 
       results->error++; 
      break; 
     } 
     /*free(url);*/ 
    } 
    enqueue(worker_args->result_queue, results); 
    return NULL; 
} 

typedef struct { 
    const char *filename; 
    Queue *url_queue; 
} FileReaderArguments; 

void *file_reader(void *args) { 
    /* 
    * TODO: 
    * 1. Open filename (use fopen, check for errors) 
    * 2. Use getline() to read lines (i.e. URLs) from the file (use errno to check for errors) 
    * 3. Copy each url to the heap (use malloc and strncpy) 
    * 4. Enqueue the URL to url_queue 
    * 5. Don't forget to free the line variable, and close the file (and check for errors!) 
    */ 

    FileReaderArguments *file_reader_args = (FileReaderArguments *)args; 
    FILE *toplist_file; 
    char *line = NULL; 
    char *url = NULL; 
    size_t len = 0; 
    ssize_t read = 0; 

    toplist_file = fopen(file_reader_args->filename, "r"); 


     if (toplist_file == NULL) { 
      exit(EXIT_FAILURE); 
     } 


    while ((read = getline(&line, &len, toplist_file)) != -1) { 
     if (read == -1) { 
      perror("error reading file"); 
     } 
     if(read == 1) continue; /*empty line*/ 
     url = malloc(read); 
     if(url == NULL) { 
      perror("Could not allocate memory"); 
     } 
     line[read-1] = '\0'; /* null-terminate the URL */ 
     strncpy(url, line, read); 
     enqueue(file_reader_args->url_queue, url); 
    } 
    fclose(toplist_file); 
    return NULL; 
} 

typedef struct { 
    int number_of_threads; 
    Queue *url_queue; 
    Queue *result_queue; 
} CollectorArguments; 

void *collector(void *args) { 
    /* 
    * TODO: 
    * 1. Enqueue number_of_threads NULLs to the url_queue 
    * 2. Dequeue and aggregate number_of_threads thread_results 
    *  from result_queue into results (don't forget to free() thread_results) 
    * 3. Print aggregated results to the screen 
    */ 

    CollectorArguments *collector_args = (CollectorArguments *)args; 
    UrlStatus results = {0}; 
    UrlStatus *thread_results; 
    int i; 

    for(i= 0; i < collector_args->number_of_threads; i++) { 
    } 

    for(i= 0; i < collector_args->number_of_threads; i++) { 
     thread_results = dequeue(collector_args->result_queue); 
     results.ok += thread_results->ok; 
     results.error += thread_results->error; 
     results.unknown += thread_results->unknown; 
     /* free(thread_results);*/ 
    } 

    printf("%d OK, %d Error, %d Unknown\n", 
      results.ok, 
      results.error, 
      results.unknown); 
    return NULL; 
} 

void parallel_checker(const char *filename, int number_of_threads) { 
    /* 
    * TODO: 
    * 1. Initialize a Queue for URLs, a Queue for results (use QUEUE_SIZE) 
    * 2. Start number_of_threads threads running worker() 
    * 3. Start a thread running file_reader(), and join it 
    * 4. Start a thread running collector(), and join it 
    * 5. Join all worker threads 
    * 6. Destroy both queues 
    */ 
    Queue url_queue, result_queue; 
    WorkerArguments worker_arguments = {0}; 
    FileReaderArguments file_reader_arguments = {0}; 
    CollectorArguments collector_arguments = {0}; 
    pthread_t *worker_threads; 
    pthread_t file_reader_thread, collector_thread; 
    int i; 
    int err; 


    queue_init(&url_queue, QUEUE_SIZE); 
    queue_init(&result_queue, QUEUE_SIZE); 

    worker_arguments.url_queue = &url_queue; 
    worker_arguments.result_queue = &result_queue; 

    file_reader_arguments.filename = filename; 
    file_reader_arguments.url_queue = &url_queue; 


    collector_arguments.url_queue = &url_queue; 
    collector_arguments.result_queue = &result_queue; 
    collector_arguments.number_of_threads = number_of_threads; 

    worker_threads = (pthread_t *) malloc(sizeof(pthread_t) * number_of_threads); 
    if (worker_threads == NULL) { 
     perror("unable to allocate memory"); 
     return; 
    } 
    curl_global_init(CURL_GLOBAL_ALL); /* init libcurl before starting threads */ 


    for(i=0; i<number_of_threads; i++) { 
     err = pthread_create(&(worker_threads[i]), NULL, &worker, &worker_arguments); 
     if (err != 0) { 
      fprintf(stderr, "can't create thread :[%s]\n", strerror(err)); 
     } 
    } 

    err = pthread_create(&file_reader_thread, NULL, &file_reader, &file_reader_arguments); 
    if (err != 0) { 
     fprintf(stderr, "can't create thread :[%s]\n", strerror(err)); 
    } 

    err = pthread_join(file_reader_thread, NULL); 
    if (err != 0) { 
     fprintf(stderr, "can't join thread :[%s]\n", strerror(err)); 
    } 


    err = pthread_create(&collector_thread, NULL, &collector, &collector_arguments);  
    if (err != 0) { 
     fprintf(stderr, "can't create thread :[%s]\n", strerror(err)); 
    } 


    err = pthread_join(collector_thread, NULL); 
    if (err != 0) { 
     fprintf(stderr, "can't join thread :[%s]\n", strerror(err)); 
    } 

    for(i=0; i<number_of_threads; i++) { 
     err = pthread_join(worker_threads[i], NULL); 

     if (err != 0) { 
      fprintf(stderr, "can't join thread :[%s]\n", strerror(err)); 
     } 
    } 




    queue_destroy(&url_queue); 
    queue_destroy(&result_queue); 
    free(worker_threads); 
} 

int main(int argc, char **argv) { 
    if (argc != 3) { 
     usage(); 
    } else { 
     parallel_checker(argv[1], atoi(argv[2])); 
    } 

    return EXIT_SUCCESS; 
} 

Ich denke, dass ich etwas mit den Synchronisationsmechanismen vermisse, kann jeder erkennen, wo ich falsch lag?

Vielen Dank !!!

+0

Können Sie weitere Informationen hinzufügen? Wo ist das Problem? Was ist die Eingabe und Ausgabe, die Sie haben sollten? Es ist ziemlich groß und nicht so einfach zu verstehen. – BetaRunner

+0

In Funktion Enqueue und Dequeue müssen Sie 'pthread_cond_wait (& (queue-> cv_empty), & (queue-> mutex)); 'in einer while-Schleife nicht innerhalb eines if, weil der Thread fälschlicherweise aufwachen kann. Werfen Sie einen Blick auf Linux Handbuch. – BetaRunner

+0

@BetaRunner, ich habe meinen Post bearbeitet und eine Beispieleingabe hinzugefügt (sie sollte in einer Textdatei sein). Ich habe auch bemerkt, was du über pthread_cond_wait gesagt hast, du hast Recht und ich habe es behoben. Ich werde den Code aktualisieren, aber das Problem bleibt bestehen. der Code hört einfach nicht auf .. es ist fest – user1326293

Antwort

0

Eine Multi-Threading-Logik Fehler sind auf jeden Fall:

Es gibt keinen Unterschied zwischen „Arbeiter-Eingabewarteschlange vorübergehend leer“ und „Datei Leserthread fertig“ - Ihre Arbeiter einfach beendet werden, wenn es nichts zu Lesen Sie (auch vorübergehend) in ihrer Eingabewarteschlange. Wenn also der Thread file_reader Warteschlangeneinträge erzeugt, die langsamer sind als die Arbeiter sie aus irgendeinem Grund konsumieren, werden die Konsumenten verhungern und sterben, so dass der Produzent keinen Verbraucher mehr hat und sich schließlich aufhängt.