Ich versuche, einen SingleBlockingQueue<T>
Synchronizer zu erstellen, der einen Thread zu offer()
ein Element zu ihm und einen anderen Thread, der take()
es ermöglicht. Nur ein T
Element wird innerhalb der SingleBlockingQueue<T>
gleichzeitig gehalten, und der schiebende Thread ist blockiert auf offer()
, wenn das vorherige Element auf den Thread zu take()
wartet. Der schiebende Thread schiebt Elemente weiter, bis er setComplete()
aufruft, und der Thread, der den Thread nimmt, ruft weiterhin take()
auf, während falsch ist. Der Threading-Thread wird blockiert, wenn er auf ein Element wartet.Erstellen eines SingleBlockingQueue Synchronizers
Hier ist der Synchronizer, den ich bisher habe.
import java.util.concurrent.atomic.AtomicBoolean;
public final class SingleBlockingQueue<T> {
private volatile T value;
private final AtomicBoolean isComplete = new AtomicBoolean(false);
private final AtomicBoolean isPresent = new AtomicBoolean(false);
public void offer(T value) throws InterruptedException {
while (isPresent.get()) {
this.wait();
}
this.value = value;
synchronized(this) {
this.notifyAll();
}
}
public boolean isComplete() {
return !isPresent.get() && isComplete.get();
}
public void setComplete() {
isComplete.set(true);
}
public T take() throws InterruptedException {
while (!isPresent.get()) {
this.wait();
}
T returnValue = value;
isPresent.set(false);
synchronized(this) {
this.notifyAll();
}
return returnValue;
}
}
Hier ist ein Anwendungsbeispiel in Kotlin
val queue = SingleBlockingQueue<Int>()
thread {
for (i in 1..1000) {
queue.offer(i)
}
queue.setComplete()
}
thread {
while (!queue.isComplete) {
println(queue.take())
}
}
Thread.sleep(100000)
aber ich erhalte eine Fehlermeldung, und ich bin ein bisschen in mir über den Kopf an dieser Stelle. Ich habe seit langer Zeit dank RxJava keine Synchronisierungen gemacht. Was genau mache ich falsch?
Exception in thread "Thread-1" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:502)
at com.swa.rm.common.util.SingleBlockingQueue.take(SingleBlockingQueue.java:29)
at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:33)
at RxOperatorTest$testSingleBlockingQueue$2.invoke(RxOperatorTest.kt:8)
at kotlin.concurrent.ThreadsKt$thread$thread$1.run(Thread.kt:18)
Sollte nicht warten() innerhalb eines synchronisierten Blocks aufgerufen werden? –
Ich habe das versucht, hatte aber dieselben Probleme. Vielleicht muss ich zwei unabhängige Sperren erstellen? Habe das auch nicht für diesen Synchronizer gemacht, den ich vor einiger Zeit geschrieben habe und hatte keine Probleme. https://github.com/thomasnield/tom-sync/blob/master/src/main/java/org/nield/concurrency/BufferedLatch.java – tmn
dass github-code den gleichen fehler ergibt, wenn ich direkt apply anrufe() [Just versucht in meiner lokalen Sonnenfinsternis]. Möglicherweise haben Sie eine Sperre auf dem Monitor mit diesem Objekt vor dem Aufruf von await() verbunden. –