2017-08-21 8 views
2

Angenommen, ich habe eine Quelle von verschiedenen Früchten, und ich möchte ihre Anzahl in eine Datenbank einfügen.Gruppierung von Elementen in Scala/Akka Streams

Ich kann etwas tun:

Flow[Fruits] 
.map { item => 
    insertItemToDatabase(item) 
} 

Aber das ist offensichtlich langsam - warum legen Sie auf eine Datenbank mit jedem Element, wenn ich kann gruppieren sie nach oben? So kam ich mit einer besseren Lösung:

Flow[Fruits] 
.grouped(10000) 
.map { items => 
    insertItemsToDatabase(items) 
} 

Aber das bedeutet, dass ich bis 10 000 Elemente halten [banana, orange, orange, orange, banana, ...] im Speicher, bis sie zur Datenbank gespült werden. Ist das nicht ineffizient? Vielleicht kann ich so etwas tun:

Flow[Fruits] 
.grouped(100) 
.map { items => 
    consolidate(items) // this will return Map[String, Int] 
} 
.grouped(100) 
// here I have Seq[Map[String, Int]] 
.map { mapOfItems=> 
    insertMapToDatabase(mapOfItems) 
} 

Von meinem Verständnis, sollte dies auch 10 000 Elemente auf einmal verarbeiten, sollte aber nicht so viel Speicher (vorausgesetzt, die Elemente werden oft wiederholt) in Anspruch nehmen. Aber jede Taste wird immer noch 100 Mal im Speicher wiederholt. Sicher kann ich tun .grouped(10).map().grouped(10).map().grouped(10).map().grouped(10).map() ... Aber gibt es keinen besseren Weg? Vielleicht so etwas wie diese:

Flow[Fruits] 
.map { item => 
    addToMap(item) 
    if(myMap.length == 10000) { 
     insertToDatabase(myMap) 
     clearMyMap() 
    } 
} 

Aber ist es das Konzept nicht brechen von Akka Strömen, nämlich Unabhängigkeit (und damit die Parallelität) von Verarbeitungsstufen?

+0

Schauen Sie sich die Funktion '' groupedWithin'' an. Es benötigt zwei Parameter: eine maximale Anzahl von Elementen und eine Zeitkonstante. Zum Beispiel gibt '' .grupedWithnin (5000, 1.seconds) '' 5000 Elemente zum Verarbeiten, wenn Sie es vor 1 Sekunde erreicht haben, oder es gibt die Anzahl der Elemente in 1 Sekunde an. – alifirat

+0

Danke @alifirat für deinen Vorschlag, aber das ist nur eine andere Art zu gruppieren. Was ich brauche, ist eine andere Art, die Daten, die ich habe, sowohl speicherfreundlich als auch datenbankfreundlich zu verarbeiten. –

Antwort

1

Wenn die Kardinalität des Satzes Fruit niedrig ist, können Sie eine einzelne Map mit allen Counts speichern und diese nach dem Streaming durch alle Fruit-Werte in die Datenbank übertragen.

Zuerst ein Fluss konstruieren, die die laufende Zählung halten:

type Count = Int 

type FruitCount = Map[Fruit, Count] 

val zeroCount : FruitCount = 
    Map.empty[Fruit, Count] withDefaultValue 0 

val appendFruitToCount : (FruitCount, Fruit) => FruitCount = 
    (fruitCount, fruit) => fruitCount + (fruit -> fruitCount(fruit) + 1) 

val fruitCountFlow : Flow[Fruit, FruitCount, NotUsed] = 
    Flow[Fruit].scan(zeroCount)(appendFruitToCount) 

Jetzt eine Sink erstellen, die die letzte FruitCount und materialisieren den Strom erhalten:

val lastFruitCountSink : Sink[FruitCount, _] = Sink.lastOption[FruitCount] 

val fruitSource : Source[Fruit, NotUsed] = ??? 

val lastFruitCountFut : Future[Option[FruitCount]] = 
    fruitSource 
    .via(fruitCountFlow) 
    .to(lastFruitCountSink) 
    .run() 

Die lastFruitCountFut kann dann zum Senden von Werten an die Datenbank verwendet:

lastFruitCountFut foreach (_ foreach (_ foreach { (fruit, count) => 
    insertItemsToDatabase(Iterator.fill(count)(fruit)) 
})) 

Eine Iterator wird verwendet, weil es sich um die speichereffizienteste Sammlung für den Bau einer Obstobjekte handelt.

Diese Lösung speichert nur 1 Map im Speicher, der einen Schlüssel für jeden einzelnen Fruchttyp hat & 1 Ganzzahl für jeden Schlüssel.

Verwandte Themen