-Update zu versenken: Ich habe meine Frage in test project zu erklären, was ich im Detail bedeutenAkka Strom Schiebefenster emittieren zu steuern reduce von SourceQueue
============== ================================================= =====
Ich habe Akka-Quelle, die contiune aus der Datenbanktabelle lesen, und groupby einige Schlüssel dann reduzieren. Allerdings scheint es, nachdem ich reduzieren Funktion anwenden, die Daten nie zu sinken, wird es kontinuierlich reduzieren, da Upstream immer Daten kommen.
Ich lese einige Post und versuchte gruppiertWithin und gleitend, aber es funktioniert nicht, wie ich dachte, es nur die Nachricht zu größeren Teil gruppieren, aber nie die Upstream-Pause und emittieren, um zu sinken. Im Anschluss wird der Code in Akka Strom 2.5.2
Die Quelle reduzieren Code:
source = source
.groupedWithin(100, FiniteDuration.apply(1, TimeUnit.SECONDS))
.sliding(3, 1)
.mapConcat(i -> i)
.mapConcat(i -> i)
.groupBy(2000000, i -> i.getEntityName())
.map(i -> new Pair<>(i.getEntityName(), i))
.reduce((l, r) ->{ l.second().setAction(r.second().getAction() + l.second().getAction()); return l;})
.map(i -> i.second())
.mergeSubstreams();
The Sink und laufen:
Sink<Object, CompletionStage<Done>> sink =
Sink.foreach(i -> System.out.println(i))
final RunnableGraph<SourceQueueWithComplete<Object>> run = source.toMat(sink, Keep.left());
run.run(materIalizer);
Ich habe auch versucht .takeWhile (prädizierten); Ich verwende den Timer, um den Prädikat-Wert true und false zu wechseln, aber es scheint, dass nur der erste Schalter auf false gesetzt wird, wenn ich zurück auf true umschalte, ist es kein Neustart stromaufwärts.
Bitte helfen Sie mir, danke im Voraus!
============================================== ===
Update für
Informationen über die Art der Elemente
hinzufügen, was ich will: habe ich Klasse Anruf SystemCodeTracking
enthält 2 Attribute (id, entityName)
werde ich Liste des Objekts hat: (1, "table1"), (2, "table2"), (3, "table3"),(4, "table1"),(5, "table3")
Ich mag entityName GROUPBY dann die ID Summe ist daher das Ergebnis Ich mag würde, um zu sehen folgende
("table1" 1+4),("table3", 3+5),("table2", 2)
Der Code, den ich jetzt tue, ist
folgendesource
.groupBy(2000000, systemCodeTracking -> systemCodeTracking.getEntityName)
.map(systemCodeTracking -> new Pair<String, Integer>(systemCodeTracking.getEntityName, SystemCodeTracking.getId()))
.scan(....)
meine Frage jetzt ist mehr darüber, wie Scan inital Staat aufzubauen soll ich tun?
scan(new Pair<>("", 0), (first, second) -> first.setId(first.getId() + second.getId()))
Warum setzen Sie 'mapConcat' nach Ihrer Gruppierung fließt? –
Weil groupedWithin und sliding mir eine Liste von Objekten geben, aber ich brauche nur eins nach dem anderen zu reduzieren. Soll ich die ganze Liste reduzieren? – zt1983811
Ja. Sie sollten einen der beiden Werte (gleitend oder gruppiert) verwenden und dann jedes Listenelement reduzieren. Ich glaube, was Sie wirklich wollen, ist gruppiertWithin, es ist genau ein Zeitfenster (mit maximaler Größe). –