2017-09-14 1 views
0

Ich habe mit dem Schreiben einer benutzerdefinierten Quelle in Java experimentiert. Insbesondere schrieb ich eine Quelle, die Elemente aus einer BlockingQueue nimmt. Ich kenne Source.queue, aber ich weiß nicht, wie man den materialisierten Wert erhält, wenn ich mehrere davon in eine Merge-Stufe verbinde. Wie auch immer, hier ist die Umsetzung:Akka Streams - eine Merge-Phase schiebt manchmal nur dann flussabwärts, wenn alle Upstream-Quellen dorthin geschoben wurden

public class TestingSource extends GraphStage<SourceShape<String>> { 
    private static final ExecutorService executor = Executors.newCachedThreadPool(); 

    public final Outlet<String> out = Outlet.create("TestingSource.out"); 
    private final SourceShape<String> shape = SourceShape.of(out); 

    private final BlockingQueue<String> queue; 
    private final String identifier; 

    public TestingSource(BlockingQueue<String> queue, String identifier) { 
     this.queue = queue; 
     this.identifier = identifier; 
    } 

    @Override 
    public SourceShape<String> shape() { 
     return shape; 
    } 

    @Override 
    public GraphStageLogic createLogic(Attributes inheritedAttributes) { 
     return new GraphStageLogic(shape()) { 
      private AsyncCallback<BlockingQueue<String>> callBack; 

      { 
       setHandler(out, new AbstractOutHandler() { 
        @Override 
        public void onPull() throws Exception { 
         String string = queue.poll(); 
         if (string == null) { 
          System.out.println("TestingSource " + identifier + " no records in queue, invoking callback"); 
          executor.submit(() -> callBack.invoke(queue)); // necessary, otherwise blocks upstream 
         } else { 
          System.out.println("TestingSource " + identifier + " found record during pull, pushing"); 
          push(out, string); 
         } 
        } 
       }); 
      } 

      @Override 
      public void preStart() { 
       callBack = createAsyncCallback(queue -> { 
        String string = null; 
        while (string == null) { 
         Thread.sleep(100); 
         string = queue.poll(); 
        } 
        push(out, string); 
        System.out.println("TestingSource " + identifier + " found record during callback, pushed"); 
       }); 
      } 
     }; 
    } 
} 

Dieses Beispiel funktioniert, so scheint es, dass meine Quelle ordnungsgemäß funktioniert:

private static void simpleStream() throws InterruptedException { 
    BlockingQueue<String> queue = new LinkedBlockingQueue<>(); 
    Source.fromGraph(new TestingSource(queue, "source")) 
      .to(Sink.foreach(record -> System.out.println(record))) 
      .run(materializer); 

    Thread.sleep(2500); 
    queue.add("first"); 

    Thread.sleep(2500); 
    queue.add("second"); 
} 

ich auch ein Beispiel schrieb, dass zwei der Quellen zu einer Merge Stufe verbindet:

private static void simpleMerge() throws InterruptedException { 
    BlockingQueue<String> queue1 = new LinkedBlockingQueue<>(); 
    BlockingQueue<String> queue2 = new LinkedBlockingQueue<>(); 

    final RunnableGraph<?> result = RunnableGraph.fromGraph(GraphDSL.create(
      Sink.foreach(record -> System.out.println(record)), 
      (builder, out) -> { 
       final UniformFanInShape<String, String> merge = 
         builder.add(Merge.create(2)); 
       builder.from(builder.add(new TestingSource(queue1, "queue1"))) 
         .toInlet(merge.in(0)); 
       builder.from(builder.add(new TestingSource(queue2, "queue2"))) 
         .toInlet(merge.in(1)); 

       builder.from(merge.out()) 
         .to(out); 
       return ClosedShape.getInstance(); 
      })); 
    result.run(materializer); 

    Thread.sleep(2500); 
    System.out.println("seeding first queue"); 
    queue1.add("first"); 

    Thread.sleep(2500); 
    System.out.println("seeding second queue"); 
    queue2.add("second"); 
} 

Manchmal funktioniert dieses Beispiel, wie ich erwar- es nach 5 Sekunden „erste“ druckt und druckt dann „zweite“ nach weiteren 5 Sekunden.

Manchmal (etwa 1 von 5 Läufen) druckt es nach 10 Sekunden "Sekunde" und druckt sofort "zuerst". Mit anderen Worten, die Merge-Stufe schiebt die Strings nur dann nach unten, wenn beide Quellen etwas geschoben haben. Die vollständige Ausgabe sieht wie folgt aus:

TestingSource queue1 no records in queue, invoking callback 
TestingSource queue2 no records in queue, invoking callback 
seeding first queue 
seeding second queue 
TestingSource queue2 found record during callback, pushed 
second 
TestingSource queue2 no records in queue, invoking callback 
TestingSource queue1 found record during callback, pushed 
first 
TestingSource queue1 no records in queue, invoking callback 

Dieses Phänomen geschieht häufiger mit MergePreferred und MergePrioritized.

Meine Frage ist - ist dies das richtige Verhalten von Merge? Wenn nicht, was mache ich falsch?

Antwort

0

Auf den ersten Blick scheint das Blockieren des Threads mit einem Thread.sleep in der Mitte der Bühne zumindest eines der Probleme zu sein.

Wie auch immer, ich denke, es wäre viel einfacher zu verwenden Source.queue, wie Sie am Anfang Ihrer Frage erwähnen. Wenn das Problem ist die materialisierte Wert zu extrahieren, wenn die GraphDSL verwenden, hier ist, wie Sie es tun:

final Source<String, SourceQueueWithComplete<String>> source = Source.queue(100, OverflowStrategy.backpressure()); 
    final Sink<Object, CompletionStage<akka.Done>> sink = Sink.ignore(); 

    final RunnableGraph<Pair<SourceQueueWithComplete<String>, CompletionStage<akka.Done>>> g = 
      RunnableGraph.fromGraph(
        GraphDSL.create(
          source, 
          sink, 
          Keep.both(), 
          (b, src, snk) -> { 
           b.from(src).to(snk); 
           return ClosedShape.getInstance(); 
          } 
        ) 
      ); 

    g.run(materializer); // this gives you back the queue 

Weitere Informationen hierzu finden Sie im docs.

+0

Vielen Dank für Ihre Antwort. Was ich, wenn ich eine beliebige Menge von Quellen verwenden möchte (z. B. habe ich eine "Liste "), und verbinden sie mit einer Merge-Stufe? Wie kann ich alle ihre Warteschlangen bekommen? Außerdem ist 'Thread.sleep' im Hauptthread, warum würde es den Stream beeinflussen? – akir94

+0

Um eine beliebige Menge an Quelle zusammenzuführen, schauen Sie in MergeHub. Dokumente hier http://doc.akka.io/docs/akka/2.5/scala/stream/stream-dynamic.html#using-the-mergehub. Was die blockierenden Bits in deinem Code angeht, hast du einen 'Thread.sleep' als Teil deiner' Prestart'-Funktion, außerdem hast du 'queue.poll' in deinem' onPull'-Callback. Diese blockieren alle und sollten nicht innerhalb einer Grafikstufe aufgerufen werden, es sei denn, Sie führen sie auf einem dedizierten Dispatcher aus. Lies dies für weitere Informationen http://doc.akka.io/docs/akka/2.5/scala/dispatchers.html#blocking-needs-careful-management –

+0

Danke, ich habe 'MergeHub' komplett vermisst. Eine letzte Frage- Ich möchte eine ähnliche Funktionalität wie 'MergePrioritized' haben, wobei jede' Quelle' eine andere Priorität hat. Was ist der richtige Weg, dies mit 'MergeHub' zu erreichen? Die Dokumente scheinen es nicht zu bedecken. – akir94

Verwandte Themen