2016-04-26 14 views
3

Hallo Ich habe Code einfach erhalten Wortzählungen aus einem Dokument. Ich muss auch eine Karte verwenden, um den Datenwert nachzuschlagen, bevor die Ausgabe erzeugt wird. Hier ist der Code.reduceByKey ist kein Mitglied

requests 
    .filter(_.description.exists(_.length > 0)) 
    .flatMap { case request => 
     broadcastDataMap.value.get(request.requestId).map { 
     data => 
      val text = Seq(
      data.name, 
      data.taxonym, 
      data.pluralTaxonym, 
      request.description.get 
     ).mkString(" ") 
      getWordCountsInDocument(text).map { case (word, count) => 
      (word, Map(request.requestId -> count)) 
      } 
     } 
    } 
    .reduceByKey(mergeMap) 

Die Fehlermeldung ist

reduceByKey is not a member of org.apache.spark.rdd.RDD[scala.collection.immutable.Map[String,scala.collection.immutable.Map[Int,Int]]] 

Wie kann ich dieses Problem lösen? Ich muss getWordCountsInDocument aufrufen. Vielen Dank!

+0

brauchen Sie PairRDD. Versuchen Sie, .map() vor reduceByKey – Natalia

Antwort

3

reduceByKey ist ein Mitglied von PairRDDFunctions. Es wird implizit RDDs im Format RDD[(K, V)] hinzugefügt. Sie müssen wahrscheinlich die Struktur zu einem RDD[String, Map[Int,Int]] flachen.

Wenn Sie die Typen für Ihre Eingaben bereitstellen können (requests, broadcastDataMap und mergeMap), können wir möglicherweise einige Hilfe bei dieser Konvertierung bereitstellen.

Von den Typen zur Verfügung gestellt und der Annahme, dass der Rückgabetyp von getWordCountsInDocument einige Collection [(Wort, zählen: Int)]

ändern:

broadcastDataMap.value.get(request.requestId).map { 

zu

broadcastDataMap.value.get(request.requestId).flatMap { 

sollte das Problem beheben.

+0

Danke zu verwenden. # 1: Anfragen ist ein RDD der Anfrage, die (requestId, Beschreibung) # 2 hat: broadcastDataMap ist eine Übertragung von Map [requestID, Daten (Name, Taxonym, PluralTaxonym)] # 3: MergeMap ist eine Funktion, die zwei Map [Int , Int] und gibt eine Map zurück [Int, Int]: private def mergeMap (map1: Karte [Int, Int], map2: Karte [Int, Int]): Karte [Int, Int] = { (map1 ++ map2) .map {case (key, _) => (schlüssel, map1.getOrElse (schlüssel, 0) + map2.getOrElse (schlüssel, 0)) } } – RandomBookmark

Verwandte Themen