Ich habe mit dem Schreiben einer benutzerdefinierten Quelle in Java experimentiert. Insbesondere schrieb ich eine Quelle, die Elemente aus einer BlockingQueue nimmt. Ich kenne Source.queue, aber ich weiß nicht, wie man den materialisierten Wert erhält, wenn ich mehrere davon in eine Merge-Stufe verbinde. Wie auch immer, hier ist die Umsetzung:Akka Streams - eine Merge-Phase schiebt manchmal nur dann flussabwärts, wenn alle Upstream-Quellen dorthin geschoben wurden
public class TestingSource extends GraphStage<SourceShape<String>> {
private static final ExecutorService executor = Executors.newCachedThreadPool();
public final Outlet<String> out = Outlet.create("TestingSource.out");
private final SourceShape<String> shape = SourceShape.of(out);
private final BlockingQueue<String> queue;
private final String identifier;
public TestingSource(BlockingQueue<String> queue, String identifier) {
this.queue = queue;
this.identifier = identifier;
}
@Override
public SourceShape<String> shape() {
return shape;
}
@Override
public GraphStageLogic createLogic(Attributes inheritedAttributes) {
return new GraphStageLogic(shape()) {
private AsyncCallback<BlockingQueue<String>> callBack;
{
setHandler(out, new AbstractOutHandler() {
@Override
public void onPull() throws Exception {
String string = queue.poll();
if (string == null) {
System.out.println("TestingSource " + identifier + " no records in queue, invoking callback");
executor.submit(() -> callBack.invoke(queue)); // necessary, otherwise blocks upstream
} else {
System.out.println("TestingSource " + identifier + " found record during pull, pushing");
push(out, string);
}
}
});
}
@Override
public void preStart() {
callBack = createAsyncCallback(queue -> {
String string = null;
while (string == null) {
Thread.sleep(100);
string = queue.poll();
}
push(out, string);
System.out.println("TestingSource " + identifier + " found record during callback, pushed");
});
}
};
}
}
Dieses Beispiel funktioniert, so scheint es, dass meine Quelle ordnungsgemäß funktioniert:
private static void simpleStream() throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
Source.fromGraph(new TestingSource(queue, "source"))
.to(Sink.foreach(record -> System.out.println(record)))
.run(materializer);
Thread.sleep(2500);
queue.add("first");
Thread.sleep(2500);
queue.add("second");
}
ich auch ein Beispiel schrieb, dass zwei der Quellen zu einer Merge Stufe verbindet:
private static void simpleMerge() throws InterruptedException {
BlockingQueue<String> queue1 = new LinkedBlockingQueue<>();
BlockingQueue<String> queue2 = new LinkedBlockingQueue<>();
final RunnableGraph<?> result = RunnableGraph.fromGraph(GraphDSL.create(
Sink.foreach(record -> System.out.println(record)),
(builder, out) -> {
final UniformFanInShape<String, String> merge =
builder.add(Merge.create(2));
builder.from(builder.add(new TestingSource(queue1, "queue1")))
.toInlet(merge.in(0));
builder.from(builder.add(new TestingSource(queue2, "queue2")))
.toInlet(merge.in(1));
builder.from(merge.out())
.to(out);
return ClosedShape.getInstance();
}));
result.run(materializer);
Thread.sleep(2500);
System.out.println("seeding first queue");
queue1.add("first");
Thread.sleep(2500);
System.out.println("seeding second queue");
queue2.add("second");
}
Manchmal funktioniert dieses Beispiel, wie ich erwar- es nach 5 Sekunden „erste“ druckt und druckt dann „zweite“ nach weiteren 5 Sekunden.
Manchmal (etwa 1 von 5 Läufen) druckt es nach 10 Sekunden "Sekunde" und druckt sofort "zuerst". Mit anderen Worten, die Merge-Stufe schiebt die Strings nur dann nach unten, wenn beide Quellen etwas geschoben haben. Die vollständige Ausgabe sieht wie folgt aus:
TestingSource queue1 no records in queue, invoking callback
TestingSource queue2 no records in queue, invoking callback
seeding first queue
seeding second queue
TestingSource queue2 found record during callback, pushed
second
TestingSource queue2 no records in queue, invoking callback
TestingSource queue1 found record during callback, pushed
first
TestingSource queue1 no records in queue, invoking callback
Dieses Phänomen geschieht häufiger mit MergePreferred und MergePrioritized.
Meine Frage ist - ist dies das richtige Verhalten von Merge? Wenn nicht, was mache ich falsch?
Vielen Dank für Ihre Antwort. Was ich, wenn ich eine beliebige Menge von Quellen verwenden möchte (z. B. habe ich eine "Liste
Um eine beliebige Menge an Quelle zusammenzuführen, schauen Sie in MergeHub. Dokumente hier http://doc.akka.io/docs/akka/2.5/scala/stream/stream-dynamic.html#using-the-mergehub. Was die blockierenden Bits in deinem Code angeht, hast du einen 'Thread.sleep' als Teil deiner' Prestart'-Funktion, außerdem hast du 'queue.poll' in deinem' onPull'-Callback. Diese blockieren alle und sollten nicht innerhalb einer Grafikstufe aufgerufen werden, es sei denn, Sie führen sie auf einem dedizierten Dispatcher aus. Lies dies für weitere Informationen http://doc.akka.io/docs/akka/2.5/scala/dispatchers.html#blocking-needs-careful-management –
Danke, ich habe 'MergeHub' komplett vermisst. Eine letzte Frage- Ich möchte eine ähnliche Funktionalität wie 'MergePrioritized' haben, wobei jede' Quelle' eine andere Priorität hat. Was ist der richtige Weg, dies mit 'MergeHub' zu erreichen? Die Dokumente scheinen es nicht zu bedecken. – akir94