2017-09-04 1 views
0

Ich habe folgendes RDD:Scala - Kann nicht variabel nach innen für Schleife zugreifen

1:AAAAABAAAAABAAAAABAAAAAB 
2:BBAAAAAAAAAABBAAAAAAAAAA 

Jeder Charakter ist ein Ereignis. Mi gewünschte Ausgabe soll die Anzahl der Vorkommen pro Ereignisgruppe erhalten. Zu diesem ersten Beispiel sollte der Ausgang sein:

{ "A" -> 6 , "B" -> 6 } 

Mit meinem Code, erhalte ich die gewünschte Ausgabe:

val rdd = sqlContext.sparkContext.makeRDD(Seq(
"1:AAAAABAAAAABAAAAABAAAAAB","2:BBAAAAAAAAAABBAAAAAAAAAA")) 
val rddSplited = rdd.map(_.split(":")(1).toList) 
    val values = scala.collection.mutable.Map[String, Long]() 
    var iteracion = 0 
    for (ocurrences <- rddSplited) { 
     var previousVal = "0" 
     for (listValues <- ocurrences) { 
     if (listValues.toString != previousVal) { 
      values.get(listValues.toString) match { 
      case Some(e) => values.update(listValues.toString, e + 1) 
      case None => values.put(listValues.toString, 1) 
      } 
      previousVal = listValues.toString() 
     } 
     } 
     //println(values) //return the values 

    } 
     println(values) //returns an empty Map 

    } 

Das Problem ist, dass der

println (Werte)

gibt keine Daten zurück, aber wenn sie geändert wird, wenn die kommentierte println platziert wird, wird die Map-Werte gibt Werte zurück.

Wie kann ich die endgültigen Werte der Karte nach der Haupt-for-Schleife zurückgeben?

Entschuldigung, wenn meine Implementierung nicht die beste ist, bin ich neu in dieser Scala/Spark-Welt.

Vielen Dank im Voraus.

Ich bearbeite die Frage, um besser zu erklären, was ich zu erreichen versuche, Der Code, der in den Antworten zur Verfügung stellt (danke für alle Ihre Hilfe), gibt die gewünschte Ausgabe nicht zurück. Ich bin nicht versuchen, die Anzahl der Ereignisse zu zählen, was ich brauche, ist die Zahl der Ereignisse zu zählen, wenn ein Ereignis zu einem anderen wechselt, das heißt:

AAAAABAAAAABAAAAABAAAAAB => A-> 4 , B-> 4 
    BBAAAAAAAAAABBAAAAAAAAAA => A-> 2 , B-> 2 

So the final output should be A-> 6 , B-> 6 

Es tut mir leid für das Missverständnis.

+1

Sogar in einem einzigen JVM Scala-Code, der nicht empfohlen werden würde (solche Mutation/Nebeneffekt), also mit Spark ... – cchantep

Antwort

2

Scheint, als ob Sie versuchen, Ihr Ergebnis auf sehr Java-ähnliche Weise zu erreichen. Ich habe eine Scala funktionalen Stil Programm geschrieben, das genau das tut, was Sie wollen, wie folgt:

val rdd = sqlContext.sparkContext.makeRDD(Seq("1:AAAAABAAAAABAAAAABAAAAAB","2:BBAAAAAAAAAABBAAAAAAAAAA")) 

rdd.foreach{elem => 
    val splitted = elem.split(":") 
    val out: Seq[Map[Char, Int]] = splitted 
     .tail 
     .toSeq 
     .map(_.groupBy(c => c).map{case (key, values) => key -> values.length}) 
    println(out) 
    } 
+0

Ich habe neue Details zu der Frage hinzugefügt, versuchen, mich besser zu erklären. – AJDF

0

Es gibt mehrere Probleme mit Ihrem Code (gegenseitige Zustand, faul Transformationen), versuchen Sie dies:

val rdd = ss.sparkContext.makeRDD(Seq("1:AAAAABAAAAABAAAAABAAAAAB","2:BBAAAAAAAAAABBAAAAAAAAAA")) 

rdd.foreach{record => 
    val Array(_,events) = record.split(":") 
    val eventCount = events.groupBy(identity).mapValues(_.size) 
    println(eventCount) 
    } 

Beachten Sie, dass Sie die println nicht sehen, wenn Sie map statt foreach (map ist faul) verwenden. Außerdem wird die println an die Stdout der Worker-Knoten Ihres Clusters gesendet, Sie sehen sie nur, wenn Sie den Modus local in Spark verwenden.

+0

Ich habe der Frage neue Details hinzugefügt und versuche mich besser zu erklären. – AJDF

Verwandte Themen