2009-11-18 5 views
27

Ich habe eine blockierende Warteschlange von Objekten.java BlockingQueue hat keinen blockierenden Blick?

Ich möchte einen Thread schreiben, dass Blöcke, bis es ein Objekt in der Warteschlange ist. Ähnlich der Funktionalität von BlockingQueue.take().

Da ich jedoch nicht weiß, ob ich in der Lage sein werde, das Objekt erfolgreich zu bearbeiten, möchte ich nur kurz() sehen und das Objekt nicht entfernen. Ich möchte das Objekt nur entfernen, wenn ich es erfolgreich verarbeiten kann.

So möchte ich eine Blockierung peek() Funktion. Derzeit wird peek() nur zurückgegeben, wenn die Warteschlange gemäß den Javadocs leer ist.

Fehle ich etwas? Gibt es eine andere Möglichkeit, diese Funktionalität zu erreichen?

EDIT:

Irgendwelche Gedanken auf, wenn ich nur einen Thread sichere Warteschlange verwendet und guckte und schlief stattdessen?

public void run() { 
    while (!__exit) { 
     while (__queue.size() != 0) { 
      Object o = __queue.peek(); 
      if (o != null) { 
       if (consume(o) == true) { 
        __queue.remove(); 
       } else { 
        Thread.sleep(10000); //need to backoff (60s) and try again 
       } 
      } 
     } 
     Thread.sleep(1000); //wait 1s for object on queue 
    } 
} 

Beachten Sie, dass ich nur einen Consumer-Thread und einen (separaten) Produzenten-Thread habe. Ich denke, das ist nicht so effizient wie mit einer BlockingQueue ... Alle Kommentare geschätzt.

Antwort

14

Sie könnten eine LinkedBlockingDeque und physisch entfernen Sie das Element aus der Warteschlange (mit takeLast()) verwenden, aber es wieder an das Ende der Warteschlange ersetzen, wenn die Verarbeitung putLast(E e) fehl. Inzwischen Ihre „Produzenten“ würden Elemente der Front der Warteschlange putFirst(E e) mit hinzufügen.

Man könnte immer dieses Verhalten in die eigenen Queue Implementierung einkapseln und ein blockingPeek() Verfahren bereitzustellen, das durch putLast() hinter den Kulissen auf den darunterliegenden LinkedBlockingDequetakeLast() gefolgt führt. Daher wird das Element aus Sicht des aufrufenden Clients niemals aus Ihrer Warteschlange entfernt.

+2

Dies ist ein guter Vorschlag. Das einzige Problem, das ich hier sehen kann, ist, dass, wenn die Warteschlange während der Verarbeitung eines Elements voll wird, ich das aktuelle Element nicht in die Warteschlange stellen kann. – rouble

+0

Sie könnten dies umgehen, indem Sie zusätzliche Synchronisation in Ihrer Wrapper-Implementierung verwenden, wodurch Take + eine atomare Operation wird. Sie können auch eine unbegrenzte Warteschlange verwenden. – Adamski

+2

Ich würde empfehlen, nicht zu entfernen und neu hinzufügen, denn dann haben Sie die Mühe, die Statusänderungen der Warteschlange anderen Threads auszusetzen. Verwenden Sie möglicherweise ein beschäftigtes Polling, um den Peek() zu implementieren. Oder verwenden Sie einen Semaphor, der an die Warteschlange in Ihrem Wrapper gebunden ist, wenn Sie nicht abfragen möchten. –

1

Können Sie auch einfach eine Ereignis-Listener-Warteschlange zu Ihrer blockierenden Warteschlange hinzufügen, und wenn etwas zur (blockierenden) Warteschlange hinzugefügt wird, senden Sie ein Ereignis an Ihre Listener? Sie könnten Ihren Thread blockieren lassen, bis die Methode actionPerformed aufgerufen wird.

2

Das einzige, was ich davon bewusst bin, ist diese BlockingBuffer in Apache Commons Collections ist:

Wenn entweder erhalten oder auf einen leeren Puffer, der aufrufende Thread wartet auf Benachrichtigung, dass ein Add genannt entfernen oder addAlle Operation wurde abgeschlossen.

get() entspricht peek() und eine Buffer kann gemacht werden, wie BlockingQueue wirken, indem sie eine UnboundedFifoBuffer mit einem BlockingBuffer

0

Dekoration Sieht aus wie Blocking sich die Funktionalität nicht haben Sie angeben.

Ich könnte versuchen, das Problem ein wenig, obwohl wieder umrahmen: Was würden Sie tun, Sie mit Objekten kann nicht „Prozess richtig“? Wenn du sie einfach in der Warteschlange belässt, musst du sie irgendwann herausziehen und damit fertig werden. Ich würde empfehlen, herauszufinden, wie man sie verarbeitet (normalerweise, wenn eine queue.get() irgendeinen ungültigen oder schlechten Wert gibt, ist es wahrscheinlich in Ordnung, sie einfach auf den Boden fallen zu lassen) oder eine andere Datenstruktur zu wählen ein FIFO.

1

Die schnelle Antwort ist, nicht gibt es nicht wirklich eine Möglichkeit, eine Blockierung Peek, Bar eine Blockierwarteschlange mit einem Blockier-Peek() selbst implementieren.

Fehle ich etwas?

  • Wenn Sie Ihr peek() 'd Nachricht verarbeiten kann - -

peek() kann mit Concurrency lästig sein, es wird in der Warteschlange gelassen werden, es sei denn, Sie mehrere Verbraucher haben.

  • Wer holt das Objekt aus der Warteschlange, wenn Sie es nicht verarbeiten können?
  • Wenn Sie mehrere Verbraucher haben, erhalten Sie eine Race-Bedingung zwischen Ihnen und einem anderen Thread auch Verarbeitung von Elementen, was zu einer doppelten Verarbeitung oder schlimmer.
  • Klingt wie Sie tatsächlich besser dran sein könnte, das Element zu entfernen und verarbeiten sie eine Chain-of-responsibility pattern

    bearbeiten mit: re: Ihr letztes Beispiel: Wenn Sie nur 1 Verbraucher haben, werden Sie nie von der loswerden Objekt in der Warteschlange - es sei denn, es wird in der Zwischenzeit aktualisiert - in diesem Fall sollten Sie sehr vorsichtig mit der Thread-Sicherheit sein und das Objekt wahrscheinlich nicht in die Warteschlange gestellt haben.

    6

    Da ich jedoch nicht weiß, ob ich in der Lage sein werde, das Objekt erfolgreich zu bearbeiten, möchte ich nur kurz() sehen und das Objekt nicht entfernen. Ich möchte das Objekt nur entfernen, wenn ich es erfolgreich verarbeiten kann.

    Im Allgemeinen ist es nicht threadsicher. Was ist, wenn nach Ihnen peek() und feststellen, dass das Objekt erfolgreich verarbeitet werden kann, aber bevor Sie take() es entfernen und verarbeiten, ein anderes Thread dieses Objekt?

    +0

    IMHO können Sie dieses Problem lösen, indem Sie etwas synchronisieren und synchronisieren. Logik – yerlilbilgin