2017-04-14 2 views
1

Ich versuche, meine Wordcount Ergebnis in Datei zu speichern.Wert saveAsTextFile ist kein Mitglied von org.apache.spark.streaming.dstream.DStream [(String, Long)]

val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) 
wordCounts.saveAsTextFile("/home/hadoop/datafile1") 

Aber es zeigt

value saveAsTextFile is not a member of org.apache.spark.streaming.dstream.DStream[(String, Long)]    [error]  wordCounts.saveAsTextFile("/home/hadoop/datafile1") 

ich verwende Funke 2.1. Ich zeige eine Antwort, die alte Funkenversion vorschlägt. Aber ich möchte in Funken 2.1 tun. Vielen Dank.

Antwort

0

Sie verwenden eine Methode definiert für RDD auf einem DStream.

Dies ist die Methode auf RDD:

def saveAsTextFile(path: String): Unit 

... mit Beschreibung "speichern RDD als Textdatei, Stringdarstellungen von Elementen verwendet wird."

Dies ist die Methode auf DStream:

saveAsTextFiles(prefix: String, suffix: String = ""): Unit 

... mit Beschreibung "Speichern jeden RDD in diesem DSTREAM wie bei Textdatei, String-Darstellung von Elementen unter Verwendung der Dateiname bei jedem Charge Intervall ist. basierend auf Präfix und Suffix generiert: "prefix-TIME_IN_MS.suffix. "

So die Methodensignaturen unterschiedlich sind -.. Sowohl in Namen und Parameter

in Ihrem Code wordCounts ist offenbar ein DStream, so dass es nicht über eine saveAsTextFile Methode

Aber ich habe das Gefühl, Sie die Abstraktionen sind verwirrend und wirklich wollen, um die einzelnen RDD s in einem DStream Microbatch- enthielt zu schreiben, das zu tun.

counts.foreachRDD { rdd => 
    ...   
    rdd.saveAsTextFiles(s"/home/hadoop/datafile-$timestamp") 

} 
0

API documentation erwähnt die API als "saveAsTextFiles"

saveAsTextFiles(String prefix, String suffix) 

Speichern jedes RDD in diesem DSTREAM wie bei Textdatei, string Darstellung der Elemente verwendet wird.

Verwandte Themen