2017-05-19 3 views
0

Ich bin neu zu Spark & scala. Ich habe eine Anforderung, die Anzahl der JSON-Dateien vom Standort S3 aus zu verarbeiten. Diese Daten sind im wesentlichen Chargendaten, die später für eine erneute Verarbeitung aufbewahrt werden würden. Jetzt sollte mein Spark-Job diese Dateien so verarbeiten, dass er 5 rohe JSON-Records auswählen und eine Nachricht an Kafka senden soll. Der Grund für die Auswahl von nur 5 Datensätzen ist die gleichzeitige Verarbeitung von Echtzeit- und Batch-Daten zum gleichen Thema. Daher sollte die Stapelverarbeitung die Echtzeitverarbeitung nicht verzögern.Spark, um rdd Chunk von Chunk aus JSON-Dateien zu verarbeiten und zu Kafka Thema

Ich brauche die ganze JSON-Datei sequentiell zu verarbeiten und so würde ich nur 5 Aufzeichnungen zu einem Zeitpunkt und eine Nachricht an kafka holen und nächste 5 Aufzeichnungen JSON-Datei und so weiter wählen ...

Ich habe Ich habe ein Stück Code geschrieben, das aus JSON-Dateien lesen und es zum Kafka-Thema posten würde.

 val jsonRDD = sc.textFile(s3Location) 

     var count = 0 

     val buf = new StringBuilder 

     jsonRDD.collect().foreach(line => { 
      count += 1 
        buf ++= line 
        if (count > 5) { 
         println(s"Printing 5 jsons $buf") 
         count = 0 
         buf.setLength(0) 
         SendMessagetoKakfaTopic(buf) // psuedo cod for sending message to kafkatopic 
         Thread.sleep(10000) 
        } 
     }) 
     if (buf != null) { 
      println(s"Printing remaining jsons $buf") 
      SendMessagetoKakfaTopic(buf) 
     } 

Ich glaube, es ist eine effizientere Art und Weise JSONs in Funken der Verarbeitung.

Und auch ich sollte auch nach anderen Parametern wie Speicher, Ressourcen usw. suchen. Da die Daten über 100's Gig hinausgehen können.

Antwort

0

Das sieht wie ein Fall für Spark Streaming oder (empfohlen) Spark Structured Streaming.

In jedem Fall überwachen Sie ein Verzeichnis und verarbeiten neue Dateien jedes Batch-Intervall (konfigurierbar).


Sie könnten damit umgehen SparkContext.textFile mit (mit Wildcards) oder SparkContext.wholeTextFiles. In jedem Fall werden Sie schließlich mit RDD[String] enden, um die Zeilen in Ihren JSON-Dateien darzustellen (eine Zeile pro JSON-Datei).

Wenn Ihre Anforderung, die der Reihe nach verarbeiten ist, 5-zeiliges Brocken von 5-Zeilen-Brocken, könnten Sie die Transformation Pipeline effizient durch die Verwendung RDD.toLocalIterator etwas mehr machen:

toLocalIterator: Iterator[T]

Return ein Iterator, der alle Elemente in dieser RDD enthält. Der Iterator wird so viel Speicher belegen wie die größte Partition in dieser RDD.

Siehe RDD API.

Mit Iterator von JSONs würden Sie sliding mit 5 Elementen tun.

Das würde Ihnen ziemlich effiziente Pipeline geben.


ich wieder einmal dringend empfohlen, in Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) auf Structured Streaming bis zu lesen (es geht um das Lesen aber Schreiben unterstützt auch).

+0

Vielen Dank @Jacek. Gibt es eine Möglichkeit, das Laden von Eingabedateien auf rdd zu kontrollieren? Für Beispiel Wenn 100 Dateien vorhanden sind, sollten 10 Dateien einmal geladen werden und dann 10 Dateien und so weiter. Ich sah, dass es nur eine Möglichkeit gibt, mit der sc.textFile() -Methode zu partitionieren. –

+0

Nein. Keine Möglichkeit, es zu kontrollieren. Es liegt im Ermessen von 'textFile', so viele wie möglich zu laden.Sie könnten versuchen, die von Regex geladenen Dateien im Pfad des Verzeichnisses zu beschränken, z. 's3Lage/*. 2017.06. [0-5] *'. –