2016-08-03 10 views
0

Angenommen, ich habe Datensätze wie folgt aus:Apache Flink - Summe und gruppiert halten

("a-b", "data1", 1) 
("a-c", "data2", 1) 
("a-b", "data3", 1) 

Wie kann ich Gruppe und die Summe in Apache Flink, so dass ich die folgenden Ergebnisse haben?

("a-b", ["data1", "data3"], 2) 
("a-c", ["data2"], 1) 

Grüße, Kevin

Antwort

2

ich dies in der Flink Schale erreicht haben ($FLINK_HOME/bin/start-scala-shell.sh local) mit dem folgenden Code:

import org.apache.flink.util.Collector 
benv. 
    fromElements(("a-b", "data1", 1), ("a-c", "data2", 1), ("a-b", "data3", 1)). 
    groupBy(0). 
    reduceGroup { 
    (it: Iterator[(String, String, Int)], out: Collector[(String, List[String], Int)]) => { 
     // Watch out: if the group is _very_ large this can lead to OOM errors 
     val group = it.toList 
     // For all groups with at least one element (prevent out-of-bounds) 
     if (group.length > 0) 
     // Get the "name", all the elements and their third-column aggregate 
     out.collect((group(0)._1, group.map(_._2), group.map(_._3).sum)) 
    } 
    }.print 

Mit der folgenden Ausgabe

(a-b,List(data1, data3),2) 
(a-c,List(data2),1) 
+0

Thank Sie! Ich hatte gehofft, dass es zu diesem Zweck etwas anderes als eine MapReduce-ähnliche Lösung gibt, aber das ist (noch?) Nicht der Fall. Danke nochmal :-)! –

Verwandte Themen