2016-05-25 9 views
0

Ich versuche, einen SingleBlockingQueue<T> Synchronizer zu erstellen, der einen Thread zu offer() ein Element zu ihm und einen anderen Thread, der take() es ermöglicht. Nur ein T Element wird innerhalb der SingleBlockingQueue<T> gleichzeitig gehalten, und der schiebende Thread ist blockiert auf offer(), wenn das vorherige Element auf den Thread zu take() wartet. Der schiebende Thread schiebt Elemente weiter, bis er setComplete() aufruft, und der Thread, der den Thread nimmt, ruft weiterhin take() auf, während falsch ist. Der Threading-Thread wird blockiert, wenn er auf ein Element wartet.Erstellen eines SingleBlockingQueue Synchronizers

Hier ist der Synchronizer, den ich bisher habe.

import java.util.concurrent.atomic.AtomicBoolean; 

public final class SingleBlockingQueue<T> { 

    private volatile T value; 
    private final AtomicBoolean isComplete = new AtomicBoolean(false); 
    private final AtomicBoolean isPresent = new AtomicBoolean(false); 

    public void offer(T value) throws InterruptedException { 
     while (isPresent.get()) { 
      this.wait(); 
     } 
     this.value = value; 
     synchronized(this) { 
      this.notifyAll(); 
     } 
    } 
    public boolean isComplete() { 
     return !isPresent.get() && isComplete.get(); 
    } 
    public void setComplete() { 
     isComplete.set(true); 
    } 
    public T take() throws InterruptedException { 
     while (!isPresent.get()) { 
      this.wait(); 
     } 
     T returnValue = value; 
     isPresent.set(false); 
     synchronized(this) { 
      this.notifyAll(); 
     } 
     return returnValue; 
    } 
} 

Hier ist ein Anwendungsbeispiel in Kotlin

val queue = SingleBlockingQueue<Int>() 

    thread { 
     for (i in 1..1000) { 
      queue.offer(i) 
     } 
     queue.setComplete() 
    } 

    thread { 
     while (!queue.isComplete) { 
      println(queue.take()) 
     } 
    } 

    Thread.sleep(100000) 

aber ich erhalte eine Fehlermeldung, und ich bin ein bisschen in mir über den Kopf an dieser Stelle. Ich habe seit langer Zeit dank RxJava keine Synchronisierungen gemacht. Was genau mache ich falsch?

Exception in thread "Thread-1" java.lang.IllegalMonitorStateException 
    at java.lang.Object.wait(Native Method) 
    at java.lang.Object.wait(Object.java:502) 
    at com.swa.rm.common.util.SingleBlockingQueue.take(SingleBlockingQueue.java:29) 
    at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:33) 
    at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:8) 
    at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:18) 
+3

Sollte nicht warten() innerhalb eines synchronisierten Blocks aufgerufen werden? –

+0

Ich habe das versucht, hatte aber dieselben Probleme. Vielleicht muss ich zwei unabhängige Sperren erstellen? Habe das auch nicht für diesen Synchronizer gemacht, den ich vor einiger Zeit geschrieben habe und hatte keine Probleme. https://github.com/thomasnield/tom-sync/blob/master/src/main/java/org/nield/concurrency/BufferedLatch.java – tmn

+0

dass github-code den gleichen fehler ergibt, wenn ich direkt apply anrufe() [Just versucht in meiner lokalen Sonnenfinsternis]. Möglicherweise haben Sie eine Sperre auf dem Monitor mit diesem Objekt vor dem Aufruf von await() verbunden. –

Antwort

1

Wie andere darauf hingewiesen haben, könnten Sie die vorhandene Implementierung in SynchronousQueue verwenden.

Wenn Sie Ihre eigenen implementieren möchten, sind Sie ziemlich nah, müssen Sie nur sicherstellen, dass die Aufrufe an wait() in dem synchronized Block sind.

Leider glaube ich, die isComplete()/setComplete() Mechanismus in Ihrem ursprünglichen Code ist zu einem Rennen Bedingung, wie setComplete() genannt werden kann, nachdem isComplete()false und vor oder sogar während des Lesefaden take() ausführt zurückgekehrt ist. Dies würde möglicherweise den Lese-Thread hängen.

public final class SingleBlockingQueue<T> { 
    private final Object lock = new Object(); 
    private T value; 
    private boolean present = false; 

    public void offer(T value) throws InterruptedException { 
     synchronized (lock) { 
     while (present) 
      lock.wait(); 
     this.value = value; 
     present = true; 
     lock.notifyAll(); 
     } 
    } 

    public T take() throws InterruptedException { 
     synchronized (lock) { 
     while (!present) 
      lock.wait(); 
     T returnValue = value; 
     value = null; // Should release reference 
     present = false; 
     lock.notifyAll(); 
     return returnValue; 
     } 
    } 
    } 

Zum Vergleich kann es natürlich sein, diese Art von Warteschlange zu implementieren, basierend auf Semaphore oder Condition Objekten. Hier ist eine Implementierung, die ein Paar Semaphoren verwendet, um leere/volle Bedingungen zu signalisieren.

public final class SingleBlockingQueue<T> { 
    private volatile T value; 
    private final Semaphore full = new Semaphore(0); 
    private final Semaphore empty = new Semaphore(1); 

    public void offer(T value) throws InterruptedException { 
     empty.acquire(); 
     this.value = value; 
     full.release(); 
    } 

    public T take() throws InterruptedException { 
     full.acquire(); 
     T returnValue = value; 
     value = null; // Should release reference 
     empty.release(); 
     return returnValue; 
    } 
    } 
+0

Der 'Semaphore'-Ansatz ist schön einfach, danke. Eine letzte Frage. Wie kann ich sicher kommunizieren, dass es keine weiteren Artikel gibt? Ich habe den Bejeeus mit einem 'AtomicBoolean' auf Client-Seite getestet, aber ich bin immer noch vorsichtig. – tmn

+0

Hier ist meine Verwendung in Kotlin. Obwohl ich es nicht beweisen konnte, mache ich mir Sorgen, dass der letzte Elementaufruf auf 'take()' übersprungen werden könnte https://gist.github.com/thomasnield/a3f7981ea447e0c049ba5943afa44fb8#file-singleblockingqueue-usage-kt – tmn

+0

Ha, bewies meine Sorge war richtig. Ich habe meinen Grundgedanken mit einem 'sleep()' und einem Kommentar aktualisiert, der zeigt, wo dies entgleisen könnte. – tmn

2

Sie brauchen nicht es selbst zu implementieren Sie SynchronousQueue

Referenzen verwenden:

SynchronousQueue javadoc

http://tutorials.jenkov.com/java-util-concurrent/synchronousqueue.html

Die SynchronousQueue Klasse implementiert die Blocking Schnittstelle. Lesen Sie den BlockingQueue-Text, um weitere Informationen zur Schnittstelle zu erhalten.

Die SynchronousQueue ist eine Warteschlange, die intern nur ein einzelnes Element enthalten kann. Ein Thread, der ein Element in die Warteschlange einfügt, wird blockiert bis ein anderes Thread dieses Element aus der Warteschlange nimmt. Wenn ein Thread versucht, ein Element zu übernehmen, und kein Element derzeit vorhanden ist, wird dieser Thread blockiert, bis ein Thread ein Element in die Warteschlange einfügt.

+1

Die Javadoc-Beschreibung von 'SynchronousQueue' ist irreführend, sie hat nicht die Kapazität um ein Element zu speichern: Die Kapazität ist _zero_. Die Beschreibung des Verhaltens ist der Schlüssel: Der Produzententhread kann nicht einfach ein Element "setzen"() und weggehen. Jeder 'q.put (e)' Anruf ist blockiert, bis ein Verbraucher 'q.take()' aufruft. Dies bewirkt, dass sich 'SynchronousQueue' anders verhält als das, was das OP beschrieben hat. (Natürlich, ob es anders ist als das, was das OP eigentlich _wants_ ist, ist eine ganz andere Frage.) –

+0

Ja, ich möchte das 'put()' blockieren, wenn es auf ein 'take()' wartet, um das einzelne Element zu entfernen und befreie den Slot. – tmn

+1

Tatsächlich sagt die Dokumentation im Java8 JDK: "Eine synchrone Warteschlange hat keine interne Kapazität, nicht einmal eine Kapazität von eins." (Hervorhebung, meins.) –

0

Nur ein Hinweis hatte ich einige Probleme mit den ResultSet nach vorne aufgrund von Timing next() Anrufe in RxJava-JDBC Rahmen springen. Ich ging mit dieser Implementierung, um die vorher gegebenen Antworten zu modifizieren. Diese

public final class SingleBlockingQueue<T> { 
    private volatile T value; 
    private final Semaphore nextGate = new Semaphore(0); 
    private final Semaphore waitGate = new Semaphore(0); 

    private volatile boolean hasValue = true; 
    private volatile boolean isFirst = true; 

    public void offer(T value) throws InterruptedException { 
     if (isFirst) { 
      nextGate.acquire(); 
      isFirst = false; 
     } 
     this.value = value; 
     waitGate.release(); 
     nextGate.acquire(); 
    } 

    public T take() throws InterruptedException { 
     T returnValue = value; 
     value = null; // Should release reference 
     return returnValue; 
    } 
    public boolean next() throws InterruptedException { 
     nextGate.release(); 
     waitGate.acquire(); 
     return hasValue; 
    } 
    public void setDone() { 
     hasValue = false; 
     waitGate.release(); 
    } 
} 

ist, was ich benutze es für: eine RxJava Observable<T> in eine Sequence<T> in Kotlin drehen.

import com.github.davidmoten.rx.jdbc.QuerySelect 
import rx.Observable 
import rx.Scheduler 
import rx.lang.kotlin.subscribeWith 
import java.io.Closeable 

class ObservableIterator<T>(
     observable: Observable<T> 
) : Iterator<T>, Closeable { 

    private val queue = SingleBlockingQueue<T>() 

    private val subscription = 
      observable 
        .subscribeWith { 
         onNext { queue.offer(it) } 
         onCompleted { queue.setDone() } 
         onError { queue.setDone() } 
        } 

    override fun hasNext(): Boolean { 
     return queue.next() 
    } 

    override fun next(): T { 
     return queue.take() 
    } 
    override fun close() { 
     subscription.unsubscribe() 
     queue.setDone() 
    } 
} 

fun <T> Observable<T>.asSequence() = ObservableIterator(this).asSequence() 

fun QuerySelect.Builder.asSequence(scheduler: Scheduler) = get { it } 
     .subscribeOn(scheduler) 
     .asSequence() 
+1

Beachten Sie, dass die Subskription für die Observable für 'OnNext' für immer blockiert wäre, wenn' Sequence' nicht zurückgegeben wird. – Ilya