2016-06-15 5 views
0
def textfile={ 
    val ssc = new StreamingContext(conf, Seconds(10)) 

    val lines = ssc.textFileStream("hdfs://master:9000/streaming/") 
    val words = lines.flatMap(_.split("\\s")); 

    val pairs = words.map(word => (word, 1)); 
    val wordCounts = pairs.reduceByKey(_ + _); 

    wordCounts.print(); 
    ssc.start(); 
    ssc.awaitTermination(); 

} 

Die Ergebnisse zeigen nichtDie StreamingContext.textfilestream Datei gelesen wird, aber keine Ergebnisse werden auf der Konsole angezeigt

enter image description here

+0

Eingebettetes Bild. –

Antwort

0

textFileStream nur bis scannt die neuen Dateien, nachdem Sie die Streaming-Anwendung zu starten. Wenn Sie die vorhandenen Dateien scannen möchten, können Sie die folgende Problemumgehung verwenden:

fileStream[LongWritable, Text, TextInputFormat](
    directory, 
    filter = path => !path.getName().startsWith("."), 
    newFilesOnly = false).map(_._2.toString) 
Verwandte Themen