Ich bin neu zu Spark & scala. Ich habe eine Anforderung, die Anzahl der JSON-Dateien vom Standort S3 aus zu verarbeiten. Diese Daten sind im wesentlichen Chargendaten, die später für eine erneute Verarbeitung aufbewahrt werden würden. Jetzt sollte mein Spark-Job diese Dateien so verarbeiten, dass er 5 rohe JSON-Records auswählen und eine Nachricht an Kafka senden soll. Der Grund für die Auswahl von nur 5 Datensätzen ist die gleichzeitige Verarbeitung von Echtzeit- und Batch-Daten zum gleichen Thema. Daher sollte die Stapelverarbeitung die Echtzeitverarbeitung nicht verzögern.Spark, um rdd Chunk von Chunk aus JSON-Dateien zu verarbeiten und zu Kafka Thema
Ich brauche die ganze JSON-Datei sequentiell zu verarbeiten und so würde ich nur 5 Aufzeichnungen zu einem Zeitpunkt und eine Nachricht an kafka holen und nächste 5 Aufzeichnungen JSON-Datei und so weiter wählen ...
Ich habe Ich habe ein Stück Code geschrieben, das aus JSON-Dateien lesen und es zum Kafka-Thema posten würde.
val jsonRDD = sc.textFile(s3Location)
var count = 0
val buf = new StringBuilder
jsonRDD.collect().foreach(line => {
count += 1
buf ++= line
if (count > 5) {
println(s"Printing 5 jsons $buf")
count = 0
buf.setLength(0)
SendMessagetoKakfaTopic(buf) // psuedo cod for sending message to kafkatopic
Thread.sleep(10000)
}
})
if (buf != null) {
println(s"Printing remaining jsons $buf")
SendMessagetoKakfaTopic(buf)
}
Ich glaube, es ist eine effizientere Art und Weise JSONs in Funken der Verarbeitung.
Und auch ich sollte auch nach anderen Parametern wie Speicher, Ressourcen usw. suchen. Da die Daten über 100's Gig hinausgehen können.
Vielen Dank @Jacek. Gibt es eine Möglichkeit, das Laden von Eingabedateien auf rdd zu kontrollieren? Für Beispiel Wenn 100 Dateien vorhanden sind, sollten 10 Dateien einmal geladen werden und dann 10 Dateien und so weiter. Ich sah, dass es nur eine Möglichkeit gibt, mit der sc.textFile() -Methode zu partitionieren. –
Nein. Keine Möglichkeit, es zu kontrollieren. Es liegt im Ermessen von 'textFile', so viele wie möglich zu laden.Sie könnten versuchen, die von Regex geladenen Dateien im Pfad des Verzeichnisses zu beschränken, z. 's3Lage/*. 2017.06. [0-5] *'. –