2016-04-02 5 views
2

Ich bin und Scala Funken, weshalb ich ziemlich harte Zeit durchzustehen habe.Wie schreibe ich Iterator [String] Ergebnis von mapPartitions in eine Datei?

Was ich zu tun beabsichtigen, ist, dass meine Daten mit der Stanford CoreNLP mit Funken vorverarbeitet. Ich verstehe, dass ich mapPartitions zu verwenden, um habe eine StanfordCoreNLP Instanz pro Partition zu haben, wie in this thread vorgeschlagen. Ich weiß jedoch nicht, wie ich von hier aus vorgehen soll.

Am Ende mag ich Wortvektoren auf diesen Daten trainieren, aber jetzt würde ich gerne herausfinden, wie ich meine verarbeiteten Daten von hier bekommen kann und es in einer anderen Datei zu schreiben.

Das ist, was ich bisher habe:

import java.util.Properties 

import com.google.gson.Gson 
import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation, SentencesAnnotation, TokensAnnotation} 
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP} 
import edu.stanford.nlp.util.CoreMap 
import masterthesis.code.wordvectors.Review 
import org.apache.spark.{SparkConf, SparkContext} 

import scala.collection.JavaConversions._ 

object ReviewPreprocessing { 

    def main(args: Array[String]) { 

    val resourceUrl = getClass.getResource("amazon-reviews/reviews_Electronics.json") 
    val file = sc.textFile(resourceUrl.getPath) 

    val linesPerPartition = file.mapPartitions(lineIterator => { 

     val props = new Properties() 
     props.put("annotators", "tokenize, ssplit, pos, lemma") 

     val sentencesAsTextList : List[String] = List() 
     val pipeline = new StanfordCoreNLP(props) 
     val gson = new Gson() 

     while(lineIterator.hasNext) { 

     val line = lineIterator.next 
     val review = gson.fromJson(line, classOf[Review]) 
     val doc = new Annotation(review.getReviewText) 

     pipeline.annotate(doc) 

     val sentences : java.util.List[CoreMap] = doc.get(classOf[SentencesAnnotation]) 
     val sb = new StringBuilder(); 

     sentences.foreach(sentence => { 
      val tokens = sentence.get(classOf[TokensAnnotation]) 
      tokens.foreach(token => { 
      sb.append(token.get(classOf[LemmaAnnotation])) 
      sb.append(" ") 
      }) 
     }) 
     sb.setLength(sb.length - 1) 
     sentencesAsTextList.add(sb.toString) 
     } 

     sentencesAsTextList.iterator 
    })  

    System.exit(0) 
    } 

} 

Wie würde ich zum Beispiel schreibe dieses Ergebnis in eine einzige Datei? Die Reihenfolge spielt hier keine Rolle - ich denke, die Bestellung ist an diesem Punkt sowieso verloren.

+1

Suchen Sie in 'RDD.saveAsTextFile'. –

Antwort

1

Wenn Sie saveAsTextFile direkt auf Ihrem RDD verwenden, würden Sie so viele Ausgabedateien haben wie viele Partitionen, die Sie haben. Um einen zu haben, können Sie entweder alles in eine Partition verschmelzen wie

sc.textFile("/path/to/file") 
    .mapPartitions(someFunc()) 
    .coalesce(1) 
    .saveAsTextFile("/path/to/another/file") 

Oder (nur zum Spaß) können Sie alle Partitionen Fahrer eins nach dem anderen zu bekommen und alle Daten selbst speichern.

val it = sc.textFile("/path/to/file") 
    .mapPartitions(someFunc()) 
    .toLocalIterator 

while(it.hasNext) { 
    writeToFile(it.next()) 
} 
+0

Danke! Das funktioniert. Das Problem war eine weitere Ausnahme, die ausgelöst wurde. Das Ergebnis ist immer noch ein Verzeichnis '/ home/user/tmp/preprocessed.txt', die eine Datei' Teil 00000' mit dem eigentlichen Inhalt in Ich bin interessiert enthält. Ich frage mich, obwohl, wie ich nennen könnte, dass ^^ – displayname

+0

ich nur realisiert das Zusammenfügen bewirkt, dass das Ganze nur auf einer Partition läuft. Aber nicht * nach * 'someFunc()' - es tut alles nur auf einer Partition - Das ist eigentlich nicht das, was ich vorhatte. – displayname

+0

Wie viele Dateien haben Sie ohne Koaleszenz? Es könnte wenige von ihnen geben, aber nur eine könnte Daten geschrieben haben. – evgenii

Verwandte Themen