2017-06-01 1 views
0

-Update zu versenken: Ich habe meine Frage in test project zu erklären, was ich im Detail bedeutenAkka Strom Schiebefenster emittieren zu steuern reduce von SourceQueue

============== ================================================= =====

Ich habe Akka-Quelle, die contiune aus der Datenbanktabelle lesen, und groupby einige Schlüssel dann reduzieren. Allerdings scheint es, nachdem ich reduzieren Funktion anwenden, die Daten nie zu sinken, wird es kontinuierlich reduzieren, da Upstream immer Daten kommen.

Ich lese einige Post und versuchte gruppiertWithin und gleitend, aber es funktioniert nicht, wie ich dachte, es nur die Nachricht zu größeren Teil gruppieren, aber nie die Upstream-Pause und emittieren, um zu sinken. Im Anschluss wird der Code in Akka Strom 2.5.2

Die Quelle reduzieren Code:

source = source 
    .groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS)) 
    .sliding(3, 1) 
    .mapConcat(i -> i) 
    .mapConcat(i -> i) 
    .groupBy(2000000, i -> i.getEntityName()) 
    .map(i -> new Pair<>(i.getEntityName(), i)) 
    .reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;}) 
    .map(i -> i.second()) 
    .mergeSubstreams(); 

The Sink und laufen:

Sink<Object, CompletionStage<Done>> sink = 
     Sink.foreach(i -> System.out.println(i)) 
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left()); 
run.run(materIalizer); 

Ich habe auch versucht .takeWhile (prädizierten); Ich verwende den Timer, um den Prädikat-Wert true und false zu wechseln, aber es scheint, dass nur der erste Schalter auf false gesetzt wird, wenn ich zurück auf true umschalte, ist es kein Neustart stromaufwärts.

Bitte helfen Sie mir, danke im Voraus!

============================================== ===

Update für

Informationen über die Art der Elemente

hinzufügen, was ich will: habe ich Klasse Anruf SystemCodeTracking enthält 2 Attribute (id, entityName)

werde ich Liste des Objekts hat: (1, "table1"), (2, "table2"), (3, "table3"),(4, "table1"),(5, "table3")

Ich mag entityName GROUPBY dann die ID Summe ist daher das Ergebnis Ich mag würde, um zu sehen folgende

("table1" 1+4),("table3", 3+5),("table2", 2) 

Der Code, den ich jetzt tue, ist

folgende
source 
.groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName) 
.map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId())) 
.scan(....) 

meine Frage jetzt ist mehr darüber, wie Scan inital Staat aufzubauen soll ich tun?

scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId())) 
+1

Warum setzen Sie 'mapConcat' nach Ihrer Gruppierung fließt? –

+0

Weil groupedWithin und sliding mir eine Liste von Objekten geben, aber ich brauche nur eins nach dem anderen zu reduzieren. Soll ich die ganze Liste reduzieren? – zt1983811

+1

Ja. Sie sollten einen der beiden Werte (gleitend oder gruppiert) verwenden und dann jedes Listenelement reduzieren. Ich glaube, was Sie wirklich wollen, ist gruppiertWithin, es ist genau ein Zeitfenster (mit maximaler Größe). –

Antwort

1

Also, was Sie wollen, wenn ich alles gut verstehen ist:

  • erste, Gruppe von ID
  • dann Gruppe durch Zeitfenster, und innerhalb dieses Zeitfensters Summe alle systemCodeTracking.getId()

Für den ersten Teil benötigen Sie groupBy.Für den zweiten Teil groupedWithin. Sie funktionieren jedoch nicht gleich: Der erste gibt Ihnen Unterläufe, während der zweite Ihnen einen Fluss von Listen gibt.

Daher müssen wir sie anders behandeln.

Lassen Sie uns zunächst einen Minderer für Ihre Listen schreiben:

private SystemCodeTracking reduceList(List<SystemCodeTracking> list) throws Exception { 
    if (list.isEmpty()) { 
     throw new Exception(); 
    } else { 
     SystemCodeTracking building = list.get(0); 
     building.setId(0L); 
     list.forEach(next -> building.setId(building.getId() + next.getId())); 
     return building; 
    } 
} 

So für jedes Element in der Liste, erhöhen wir den building.id den Wert wollen wir erhalten, wenn die ganze Liste durchquert hat worden.

Jetzt müssen Sie tun nur

FiniteDuration sec = FiniteDuration.apply(1, TimeUnit.SECONDS) 
Source<SystemCodeTracking, SourceQueueWithComplete<SystemCodeTracking>> loggedSource = source 
    .groupBy(20000, SystemCodeTracking::getEntityName) // group by name 
    .groupedWithin(100, FiniteDuration.create(10, TimeUnit.SECONDS) // for a given name, group by time window (or by packs of 100) 
    .filterNot(List::isEmpty)       // remove empty elements from the flow (if no element has passed in the last second, to avoid error in reducer) 
    .map(this::reduceList)        // reduce each list to sum the ids 
    .log("====== doing reduceing ")     // log each passing element using akka logger, rather than `System.out.println` 
    .mergeSubstreams()         // merge back all elements with different names 
+0

Noch einmal, sorry über den Java-Stil, ich bin wirklich mehr daran gewöhnt, Scala-Code. –

+0

@Cynille Wirklich zu schätzen Ihre Antwort es funktioniert super. Ich habe ein wenig mehr auf GroupedWithin im Java-Stil fixiert ;-). Bitte akzeptieren Sie es und ich werde diese Antwort auch akzeptieren. Das einzige ist, dass es scheint, dass ich native reduzieren-Funktion nicht verwenden kann, um das zu erreichen, denke ich. Aber trotzdem Danke nochmal! – zt1983811

+0

@Cynille Warum ablehnen? .groupedWithin (100, sec) ist nicht in Java enthalten, es sollte diese .groupedWithin (100, FiniteDuration.create (10, TimeUnit.SECONDS)) sein, denke ich. – zt1983811