2011-01-02 15 views

Antwort

25

Hier ist ein ich

threadqueue.h

#ifndef _THREADQUEUE_H_ 
#define _THREADQUEUE_H_ 1 

#include <pthread.h> 

#ifdef __cplusplus 
extern "C" { 
#endif 
/** 
* @defgroup ThreadQueue ThreadQueue 
* 
* Little API for waitable queues, typically used for passing messages 
* between threads. 
* 
*/ 

/** 
* @mainpage 
    */ 

/** 
* A thread message. 
* 
* @ingroup ThreadQueue 
* 
* This is used for passing to #thread_queue_get for retreive messages. 
* the date is stored in the data member, the message type in the #msgtype. 
* 
* Typical: 
* @code 
* struct threadmsg; 
* struct myfoo *foo; 
* while(1) 
*  ret = thread_queue_get(&queue,NULL,&message); 
*  .. 
*  foo = msg.data; 
*  switch(msg.msgtype){ 
*    ... 
*  } 
* } 
* @endcode 
* 
*/ 
struct threadmsg{ 
     /** 
     * Holds the data. 
     */ 
     void *data; 
     /** 
     * Holds the messagetype 
     */ 
     long msgtype; 
     /** 
     * Holds the current queue lenght. Might not be meaningful if there's several readers 
     */ 
     long qlength; 

}; 


/** 
* A TthreadQueue 
* 
* @ingroup ThreadQueue 
* 
* You should threat this struct as opaque, never ever set/get any 
* of the variables. You have been warned. 
*/ 
struct threadqueue { 
/** 
* Length of the queue, never set this, never read this. 
* Use #threadqueue_length to read it. 
*/ 
     long length; 
/** 
* Mutex for the queue, never touch. 
*/ 
     pthread_mutex_t mutex; 
/** 
* Condition variable for the queue, never touch. 
*/ 
     pthread_cond_t cond; 
/** 
* Internal pointers for the queue, never touch. 
*/ 
     struct msglist *first,*last; 
/** 
* Internal cache of msglists 
*/ 
    struct msglist *msgpool; 
/** 
* No. of elements in the msgpool 
*/ 
    long msgpool_length; 
}; 

/** 
* Initializes a queue. 
* 
* @ingroup ThreadQueue 
* 
* thread_queue_init initializes a new threadqueue. A new queue must always 
* be initialized before it is used. 
* 
* @param queue Pointer to the queue that should be initialized 
* @return 0 on success see pthread_mutex_init 
*/ 
int thread_queue_init(struct threadqueue *queue); 

/** 
* Adds a message to a queue 
* 
* @ingroup ThreadQueue 
* 
* thread_queue_add adds a "message" to the specified queue, a message 
* is just a pointer to a anything of the users choice. Nothing is copied 
* so the user must keep track on (de)allocation of the data. 
* A message type is also specified, it is not used for anything else than 
* given back when a message is retreived from the queue. 
* 
* @param queue Pointer to the queue on where the message should be added. 
* @param data the "message". 
* @param msgtype a long specifying the message type, choice of the user. 
* @return 0 on succes ENOMEM if out of memory EINVAL if queue is NULL 
*/ 
int thread_queue_add(struct threadqueue *queue, void *data, long msgtype); 

/** 
* Gets a message from a queue 
* 
* @ingroup ThreadQueue 
* 
* thread_queue_get gets a message from the specified queue, it will block 
* the caling thread untill a message arrives, or the (optional) timeout occurs. 
* If timeout is NULL, there will be no timeout, and thread_queue_get will wait 
* untill a message arrives. 
* 
* struct timespec is defined as: 
* @code 
*  struct timespec { 
*     long tv_sec;   // seconds 
*     long tv_nsec;  // nanoseconds 
*    }; 
* @endcode 
* 
* @param queue Pointer to the queue to wait on for a message. 
* @param timeout timeout on how long to wait on a message 
* @param msg pointer that is filled in with mesagetype and data 
* 
* @return 0 on success EINVAL if queue is NULL ETIMEDOUT if timeout occurs 
*/ 
int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg); 


/** 
* Gets the length of a queue 
* 
* @ingroup ThreadQueue 
* 
* threadqueue_length returns the number of messages waiting in the queue 
* 
* @param queue Pointer to the queue for which to get the length 
* @return the length(number of pending messages) in the queue 
*/ 
long thread_queue_length(struct threadqueue *queue); 

/** 
* @ingroup ThreadQueue 
* Cleans up the queue. 
* 
* threadqueue_cleanup cleans up and destroys the queue. 
* This will remove all messages from a queue, and reset it. If 
* freedata is != 0 free(3) will be called on all pending messages in the queue 
* You cannot call this if there are someone currently adding or getting messages 
* from the queue. 
* After a queue have been cleaned, it cannot be used again untill #thread_queue_init 
* has been called on the queue. 
* 
* @param queue Pointer to the queue that should be cleaned 
* @param freedata set to nonzero if free(3) should be called on remaining 
* messages 
* @return 0 on success EINVAL if queue is NULL EBUSY if someone is holding any locks on the queue 
*/ 
int thread_queue_cleanup(struct threadqueue *queue, int freedata); 

#ifdef __cplusplus 
} 
#endif 

#endif 

threadqueue.c

#include <stdlib.h> 
#include <string.h> 
#include <errno.h> 
#include <pthread.h> 
#include <sys/time.h> 
#include "../h/threadqueue.h" 


#define MSGPOOL_SIZE 256 

struct msglist { 
    struct threadmsg msg; 
    struct msglist *next; 
}; 

static inline struct msglist *get_msglist(struct threadqueue *queue) 
{ 
    struct msglist *tmp; 

    if(queue->msgpool != NULL) { 
     tmp = queue->msgpool; 
     queue->msgpool = tmp->next; 
     queue->msgpool_length--; 
    } else { 
     tmp = malloc(sizeof *tmp); 
    } 

    return tmp; 
} 

static inline void release_msglist(struct threadqueue *queue,struct msglist *node) 
{ 

    if(queue->msgpool_length > (queue->length/8 + MSGPOOL_SIZE)) { 
     free(node); 
    } else { 
     node->msg.data = NULL; 
     node->msg.msgtype = 0; 
     node->next = queue->msgpool; 
     queue->msgpool = node; 
     queue->msgpool_length++; 
    } 
    if(queue->msgpool_length > (queue->length/4 + MSGPOOL_SIZE*10)) { 
     struct msglist *tmp = queue->msgpool; 
     queue->msgpool = tmp->next; 
     free(tmp); 
     queue->msgpool_length--; 
    } 
} 

int thread_queue_init(struct threadqueue *queue) 
{ 
    int ret = 0; 
    if (queue == NULL) { 
     return EINVAL; 
    } 
    memset(queue, 0, sizeof(struct threadqueue)); 
    ret = pthread_cond_init(&queue->cond, NULL); 
    if (ret != 0) { 
     return ret; 
    } 

    ret = pthread_mutex_init(&queue->mutex, NULL); 
    if (ret != 0) { 
     pthread_cond_destroy(&queue->cond); 
     return ret; 
    } 

    return 0; 

} 

int thread_queue_add(struct threadqueue *queue, void *data, long msgtype) 
{ 
    struct msglist *newmsg; 
    pthread_mutex_lock(&queue->mutex); 
    newmsg = get_msglist(queue); 
    if (newmsg == NULL) { 
     pthread_mutex_unlock(&queue->mutex); 
     return ENOMEM; 
    } 
    newmsg->msg.data = data; 
    newmsg->msg.msgtype = msgtype; 

    newmsg->next = NULL; 
    if (queue->last == NULL) { 
     queue->last = newmsg; 
     queue->first = newmsg; 
    } else { 
     queue->last->next = newmsg; 
     queue->last = newmsg; 
    } 

     if(queue->length == 0) 
       pthread_cond_broadcast(&queue->cond); 
    queue->length++; 
    pthread_mutex_unlock(&queue->mutex); 

    return 0; 

} 

int thread_queue_get(struct threadqueue *queue, const struct timespec *timeout, struct threadmsg *msg) 
{ 
    struct msglist *firstrec; 
    int ret = 0; 
    struct timespec abstimeout; 

    if (queue == NULL || msg == NULL) { 
     return EINVAL; 
    } 
    if (timeout) { 
     struct timeval now; 

     gettimeofday(&now, NULL); 
     abstimeout.tv_sec = now.tv_sec + timeout->tv_sec; 
     abstimeout.tv_nsec = (now.tv_usec * 1000) + timeout->tv_nsec; 
     if (abstimeout.tv_nsec >= 1000000000) { 
      abstimeout.tv_sec++; 
      abstimeout.tv_nsec -= 1000000000; 
     } 
    } 

    pthread_mutex_lock(&queue->mutex); 

    /* Will wait until awakened by a signal or broadcast */ 
    while (queue->first == NULL && ret != ETIMEDOUT) { //Need to loop to handle spurious wakeups 
     if (timeout) { 
      ret = pthread_cond_timedwait(&queue->cond, &queue->mutex, &abstimeout); 
     } else { 
      pthread_cond_wait(&queue->cond, &queue->mutex); 

     } 
    } 
    if (ret == ETIMEDOUT) { 
     pthread_mutex_unlock(&queue->mutex); 
     return ret; 
    } 

    firstrec = queue->first; 
    queue->first = queue->first->next; 
    queue->length--; 

    if (queue->first == NULL) { 
     queue->last = NULL;  // we know this since we hold the lock 
     queue->length = 0; 
    } 


    msg->data = firstrec->msg.data; 
    msg->msgtype = firstrec->msg.msgtype; 
     msg->qlength = queue->length; 

    release_msglist(queue,firstrec); 
    pthread_mutex_unlock(&queue->mutex); 

    return 0; 
} 

//maybe caller should supply a callback for cleaning the elements ? 
int thread_queue_cleanup(struct threadqueue *queue, int freedata) 
{ 
    struct msglist *rec; 
    struct msglist *next; 
    struct msglist *recs[2]; 
    int ret,i; 
    if (queue == NULL) { 
     return EINVAL; 
    } 

    pthread_mutex_lock(&queue->mutex); 
    recs[0] = queue->first; 
    recs[1] = queue->msgpool; 
    for(i = 0; i < 2 ; i++) { 
     rec = recs[i]; 
     while (rec) { 
      next = rec->next; 
      if (freedata) { 
       free(rec->msg.data); 
      } 
      free(rec); 
      rec = next; 
     } 
    } 

    pthread_mutex_unlock(&queue->mutex); 
    ret = pthread_mutex_destroy(&queue->mutex); 
    pthread_cond_destroy(&queue->cond); 

    return ret; 

} 

long thread_queue_length(struct threadqueue *queue) 
{ 
    long counter; 
    // get the length properly 
    pthread_mutex_lock(&queue->mutex); 
    counter = queue->length; 
    pthread_mutex_unlock(&queue->mutex); 
    return counter; 

} 
+0

gibt es viele Situationen, in denen 'thread_queue_length()' kann verwendet werden, in einer nicht rassistischen Art und Weise? – caf

+0

Nein. Es hängt von Ihren Bedürfnissen ab, thread_queue_length gibt Ihnen jetzt die Länge in der Warteschlange, unabhängig davon, wie viele neue Nachrichten in der letzten Nanosekunde hinzugefügt oder verarbeitet wurden. Ich benutze es einfach für die periodische Protokollierung der Warteschlangengröße. – nos

+0

'add' und' get' verwenden beide den gleichen Mutex. wenn 'get' den Mutex erwirbt und auf' broadcast' von 'add' wartet ... führt das nicht zu einem Deadlock, da add den Mutex nicht sperren konnte? – Mike