1

Ich habe den folgenden Code zum Starten einer SQL-Abfrage beim Streaming. Mein Problem ist, dass nach einem der Ergebnisse eine ArrayIndexOutOfBoundsException angezeigt wird. Warum passiert das?Spark SQL über Streaming - ArrayIndexOutOfBoundsException

import org.apache.spark._ 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.streaming.Duration 

import org.apache.spark.sql.functions.udf 

object StreamingSQL { 

    case class Persons(name: String, age: Int) 

    def main(args: Array[String]) { 

     val sparkConf = new SparkConf().setMaster("local").setAppName("HdfsWordCount") 
     val sc = new SparkContext(sparkConf) 
     // Create the context 
     val ssc = new StreamingContext(sc, Seconds(2)) 

     val lines = ssc.textFileStream("/home/cloudera/Smartcare/stream/") 
     lines.foreachRDD(rdd=>rdd.foreach(println)) 

     val sqc = new SQLContext(sc); 
     //import sqc.createSchemaRDD 
     import sqc.implicits._ 

    // Create the FileInputDStream on the directory and use the 
    // stream to count words in new files created 

     lines.foreachRDD{rdd=> 
      val persons = rdd.map(_.split(",")).map(p => Persons(p(0), p(1).trim.toInt)).toDF() 
      persons.registerTempTable("data") 
      val teenagers = sqc.sql("SELECT name FROM data WHERE age >= 13 AND age <= 19") 
      teenagers.foreach(println) 
    } 

    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

Dies ist die Ausgabe, die ich bekomme. Nach einem korrekten Ergebnis springe ich den Fehler:

16/03/23 16:58:56 INFO GenerateUnsafeProjection: Code generated in 131.828141 ms 
[Edgar] 
16/03/23 16:58:56 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) 
java.lang.ArrayIndexOutOfBoundsException: 1 

Mein txt ist:

Ana,31 
Edgar,16 
Luis,22 
Noelia,26 
Isabel50 
Pablo,34 
Laura,18 
Paco,17 
+0

Ich würde starten, indem Sie überprüfen, ob die tatsächlichen Daten in der RDD zwei Felder hat. – eliasah

+0

Ich habe überprüft, dass die RDD zwei Felder hat. – nest

+0

Ich kann es jetzt nicht testen, aber es ist gut, dass Sie mindestens ein Eingabedatenbeispiel bereitgestellt haben. Können Sie versuchen, zu überprüfen, ob es Ihnen den gleichen Fehler gibt, wenn Sie es als eine ganze RDD lesen und die Transformation durchführen? – eliasah

Antwort

3

Es ist, weil Isabel50 kein Komma hat. Ihr split(",") gibt nur einen Wert für diese Zeile zurück, so dass p(1) für diese Zeile fehlschlägt.