2016-03-22 8 views
1

Wir verwenden Spark 1.4 für Spark-Streaming. Kafka ist eine Datenquelle für den Spark-Stream.Datei wird bei Verwendung von saveAsNewAPIHadoopFile überschrieben

Aufzeichnungen werden jede Sekunde auf Kafka veröffentlicht. Unsere Anforderung besteht darin, auf Kafka veröffentlichte Datensätze in einem einzigen Ordner pro Minute zu speichern. Der Stream liest alle fünf Sekunden Datensätze. Zum Beispiel werden Aufzeichnungen, die während 1200 PM und 1201 PM veröffentlicht wurden, in dem Ordner "1200" gespeichert; zwischen 1201PM und 1202PM im Ordner "1201" und so weiter.

Der Code, den ich schrieb ist als

//First Group records in RDD by date 
stream.foreachRDD (rddWithinStream -> { 
    JavaPairRDD<String, Iterable<String>> rddGroupedByDirectory = rddWithinStream.mapToPair(t -> { 
    return new Tuple2<String, String> (targetHadoopFolder, t._2()); 
}).groupByKey(); 
// All records grouped by folders they will be stored in 


// Create RDD for each target folder. 
for (String hadoopFolder : rddGroupedByDirectory.keys().collect()) { 
    JavaPairRDD <String, Iterable<String>> rddByKey = rddGroupedByDirectory.filter(groupedTuples -> { 
    return groupedTuples._1().equals(hadoopFolder); 
    }); 

// And store it in Hadoop 
    rddByKey.saveAsNewAPIHadoopFile(directory, String.class, String.class, TextOutputFormat.class); 
} 

folgt Da der Stream-Daten verarbeitet alle fünf Sekunden, wird saveAsNewAPIHadoopFile mehrmals in einer Minute aufgerufen. Dies bewirkt, dass die Datei "Part-00000" jedes Mal überschrieben wird.

Ich erwartete, dass saveAsNewAPIHadoopFile in dem Verzeichnis, das durch den Parameter "directory" angegeben wurde, die Datei part-0000N weiterhin erstellt, auch wenn ich einen einzigen Worker-Knoten habe.

Jede Hilfe/Alternativen werden sehr geschätzt.

Danke.

Antwort

1

In diesem Fall müssen Sie Ihren Ausgabepfad und Dateinamen selbst erstellen. Die inkrementelle Dateibenennung funktioniert nur, wenn die Ausgabeoperation direkt unter DStream aufgerufen wird (nicht pro RDD).

Die Argumentfunktion in stream.foreachRDD kann Time Informationen für jeden Mikro-Batch erhalten. Mit Bezug auf Spark documentation:

def foreachRDD(foreachFunc: (RDD[T], Time) ⇒ Unit) 

So können Sie jede RDD speichern wie folgt:

stream.foreachRDD((rdd, time) -> { 
    val directory = timeToDirName(prefix, time) 
    rdd.saveAsNewAPIHadoopFile(directory, String.class, String.class, TextOutputFormat.class); 
}) 
+0

Meine timeToDirName func (dir + Zeit), und nach der Ausführung zeigt es das Verzeichnis, in hdfs aber wenn ich versuche, Zugriff darauf, es zeigt "Dir_Name existiert nicht" – JSR29

Verwandte Themen