2016-10-06 2 views
1

Ich habe eine RDD mit folgenden Struktur:Spark-Summe durch Schlüssel ohne Reduzierung Reihen

(lang, id, name, max, min) 

Ich mag eine weitere Spalte hinzuzufügen, total, die die Subtraktion des Maximalwertes der Kolonne hält max und das Minimum von Spalte min für jede eindeutige lang (ohne die Anzahl der Zeilen zu reduzieren). So würde ich so etwas wie

rdd: 
+----+--+----+---+---+ 
|lang|id|name|max|min| 
+----+--+----+---+---+ 
| en| | | 5| 1| 
| en| | | 2| 0| 
| de| | | 9| 2| 
| en| | | 7| 1| 
| nl| | | 3| 0| 
| nl| | | 5| 1| 
+----+--+----+---+---+ 

Um

rdd: 
+----+--+----+---+---+-----+ 
|lang|id|name|max|min|total| 
+----+--+----+---+---+-----+ 
| en| | | 5| 1| 7| 
| en| | | 2| 0| 7| 
| de| | | 9| 2| 7| 
| en| | | 7| 1| 7| 
| nl| | | 3| 0| 5| 
| nl| | | 5| 1| 5| 
+----+--+----+---+---+-----+ 

Aus Kompatibilitätsgründen bekommen, ich mag diesen ohne mit Datenrahmen/Spark-SQL erreichen.

Jeder Vorschlag wird sehr geschätzt!

+0

für lang = de, max der Spalte max = 7 und min Spalte min = 0, verbinden sie so sollte die Gesamt 7 sein - 0 = 7 und nicht 6. Bitte lesen Ihre Ausgabe und korrigieren Sie es – eliasah

Antwort

1

können Sie aggregieren:

val rdd = sc.parallelize(Seq(
    ("en", "id1", "name1", 5, 1), ("en", "id2", "name2", 2, 0), 
    ("de", "id3", "name3", 9, 2), ("en", "id4", "name4", 7, 1), 
    ("nl", "id5", "name5", 3, 0), ("nl", "id6", "name6", 5, 1) 
)) 

val totals = rdd.keyBy(_._1).aggregateByKey((Long.MinValue, Long.MaxValue))(
    { case ((maxA, minA), (_, _, _, maxX, minX)) => 
    (Math.max(maxA, maxX), Math.min(minA, minX)) }, 
    { case ((maxA1, minA1), (maxA2, minA2)) => 
    (Math.max(maxA1, maxA2), Math.min(minA1, minA2))} 
).mapValues { case (max, min) => max - min } 

mit den ursprünglichen Daten verbinden:

val vals = rdd.keyBy(_._1).join(totals).values 

und flach (mit Shapeless):

import shapeless.syntax.std.tuple._ 

val result = vals.map { case (x, y) => x :+ y } 

result.toDF.show 

mit einem Ausgang:

+---+---+-----+---+---+---+ 
| _1| _2| _3| _4| _5| _6| 
+---+---+-----+---+---+---+ 
| en|id1|name1| 5| 1| 7| 
| en|id2|name2| 2| 0| 7| 
| en|id4|name4| 7| 1| 7| 
| de|id3|name3| 9| 2| 7| 
| nl|id5|name5| 3| 0| 5| 
| nl|id6|name6| 5| 1| 5| 
+---+---+-----+---+---+---+ 

aber für komplexe Aggregationen wird dies mühsam, ineffizient und schwer zu verwalten ziemlich schnell.

1

Sie haben zwei Betrieb auf RDD auszuführen

1.Reducebykey

2.Join

val rdd = originalRDD.rdd.map(row => 
(row(0), (row(1).toString.toLong, row(2).toString.toLong)) 
) 

Bewerben reducebyKey und erhalten die Min- und Max-Werte jedes lang

val filterRDD = jsonRdd.reduceByKey(minMax).map(row => (row._1, (row._2._1-row._2._2))) 

    def minMax(a: Tuple2[Long, Long], b: Tuple2[Long, Long]):Tuple2[Long,Long] = { 
    val min = if (a._1 < b._1) a._1 else b._1 
    val max = if (a._2 > b._2) a._2 else b._2 
    (min, max) 
    } 

Bewerben Zustand

rdd.join(filterRDD).map(row => (row._1, row._2._1._1, row._2._1._2, row._2._2)) 
Verwandte Themen