Cl Anfang Tasten String
s zu sein, ist kein Problem, wie man es von PairRddFunctions
Unterschrift sehen können:
class PairRDDFunctions[K, V](self: RDD[(K, V)])(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
Tasten kann jede Art sein.
Ich habe noch einfacher Test um dies zu überprüfen:
val r = sc.parallelize(Seq("Robert" -> "2010/01/02" , "John" -> "2011/02/02" , "Robert" -> "2011/02/20"))
r.reduceByKey(_ + " " +_).foreach(println)
Ergebnisse:
(John,2011/02/02)
(Robert,2010/01/02 2011/02/20)
Bitte geben Sie einige weitere Details über Ihren Code, nämlich:
- Typen von Ihrem
RDD
s
- komplettere Code-Schnipsel von dem, was
- Fehlermeldung nicht funktioniert, die Sie
- aufwendig auf, was Sie unter „verschiedene Werte vergleichen“ haben.Vielleicht besser so etwas wie
groupByKey
wird hier fit
Aktualisiert Beispiel:
import java.time.LocalDate
import java.time.temporal.ChronoUnit
import java.time.format.DateTimeFormatter
def parseDate(d:String) =
LocalDate.parse(d,
DateTimeFormatter.ofPattern("yyyy/MM/dd"))
// ChronoUnit.DAYS.between(d1, d2)
val r = sc.parallelize(Seq("Robert" -> "2010/01/02" , "John" -> "2011/02/02" , "Robert" -> "2011/02/20",
"Robert" -> "2011/02/20"))
implicit def localDateOrdering = Ordering.by[LocalDate, Long](_.toEpochDay)
r.groupByKey.flatMap {
case (key, vals) =>
val valsArr = vals.toArray
valsArr.map(parseDate) match {
case x if x.length >= 3 && ChronoUnit.DAYS.between(x.min, x.max) > 10 =>
valsArr.map(key -> _)
case _ => Nil
}
}.foreach(println)
Ergebnis ist:
(Robert,2010/01/02)
(Robert,2011/02/20)
(Robert,2011/02/20)
Ich verwende groupByKey
zu Gruppendatensätzen mit dem gleichen Schlüssel in iterator
, dann in flatMap
es Erator wird in Array
gepuffert, nach der Regel validiert (Länge> 3 und größte Differenz in Tagen ist> 10) und dann, wenn der aktuelle Schlüssel den Regeln entspricht, werden alle Datensätze mit diesem Schlüssel in den ursprünglichen Zustand zurückversetzt.
Auch wenn ich Ihre Anforderungen nicht vollständig verstanden habe, hoffe ich, dass Sie das Beispiel entsprechend Ihren Bedürfnissen ändern können.
Ich bin in der Lage, den Wert zuzuweisen, finden Sie die folgende Antwort: val docsss = rows.map (r => (r (2), r (1)) docsss.foreach (println) (11,3 April 2010) 11 ist Schlüsselwert = "3. April 2010" (11.14 Mai 2011) 11 ist Schlüsselwert = "14. Mai 2011" Das Problem ist, wenn ich dies zu reducebykey nehme erlaubt nur Wert als int, aber ich Ich versuche, eine Zeichenfolge als Wert zu nehmen für zB: ("3 April 2010", "14 Mai 2011") – jk7