2017-04-12 5 views
-2

IN Code unten ich versuche, avro Nachricht von einem kafka Thema, und innerhalb der Karte Methode, wo ich KafkaAvroDecoder fromBytes Methode zu lesen, scheint es nicht die Aufgabe serializable Ausnahme zu verursachen , wie entschlüssele ich die Avro-Nachrichten?Funken strukturiert Streaming (Java): Aufgabe nicht serializable

public static void main (String [] args) Exception wirft {

Properties decoderProps = new Properties(); 
    decoderProps.put("schema.registry.url", SCHEMA_REG_URL); 
    //decoderProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true"); 

    KafkaAvroDecoder decoder = new KafkaAvroDecoder(new VerifiableProperties(decoderProps)); 


    SparkSession spark = SparkSession 
     .builder() 
     .appName("JavaCount1").master("local[2]") 
     .config("spark.driver.extraJavaOptions", "-Xss4M") 
     .getOrCreate(); 

    Dataset<Row> ds1 = spark 
     .readStream() 
     .format("kafka") 
     .option("kafka.bootstrap.servers", HOSTS) 
     .option("subscribe", "systemDec200Message") 
     .option("startingOffsets", "earliest") 
     .option("maxOffsetsPerTrigger", 1) 
     .load(); 



    Dataset<String> ds2 = ds1.map(m-> { 
     GenericData.Record data = (GenericData.Record)decoder.fromBytes((byte[]) m.get(1)); 

     return "sddasdadasdsadas"; 
}, Encoders.STRING()); 





    StreamingQuery query = ds2.writeStream() 
     .outputMode("append") 
     .format("console") 
     .trigger(ProcessingTime.apply(15)) 
     .start(); 

    query.awaitTermination(); 
} 

i wie unten die Ausnahme erhalten,

17.04.12 16.51.06 INFO Codegenerator: Code erzeugt in 329.145119 ms 17.04.12 16.51.07 eRROR StreamExecution: Abfrage [id = 1d56386c-3fba-4978-8565-6b9c880d4fce, RunID = b7bbb8d8-b52d-4c14-9dec-bc9cb41f8d77] mit Fehlern beendet org.apache.spark .SparkException: Task nicht serialisierbar bei org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 298) bei org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean (ClosureCleaner.scala: 288) unter org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 108) unter org.apache.spark.SparkContext.clean (SparkContext.scala: 2094) unter org.apache. spark.rdd.RDD $$ anonfun $ mapPartitionsWithIndex $ 1.Anwendung (RDD.scala: 840) bei org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsWithIndex $ 1.Anwendung (RDD.scala: 839) bei org.apache. spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151) bei org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 112) an org.apache.spark.rdd.RDD.withScope (RDD. Scala: 362) bei org.apache.spark.rdd.RDD.mapPartitionsWithIndex (RDD.scala: 839) bei org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute (WholeStageCodegenExec.scala: 371) bei org.apache.spark .sql.execution.SparkPlan $$ anonfun $ führen Sie $ 1.apply (SparkPlan.scala: 114) unter org.apache.spark.sql.execution.SparkPlan $$ anonfun $ aus, führen Sie $ 1.apply (SparkPlan.scala: 114) bei org .apache.spark.sql.execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply (SparkPlan.scala: 135) unter org.a pache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151)

+0

Für 'Aufgabe nicht serializable', ist es der stacktrace Teil, der Ihnen sagt, genau was serialisiert wird –

Antwort

0

die Serialisierung Problem ging weg, nachdem die KAFKA AVRO DECODER Erklärung innerhalb der Lambda-Rahmen bewegt (im Call Karte), aber jetzt eine andere Ausnahme zur Laufzeit

org.codehaus.commons.compiler.CompileException: Datei 'generated.java', Zeile 116, Spalte 101: Kein anwendbarer Konstruktor/Methode gefunden für aktuelle Parameter "long"; Kandidaten sind: "java.lang.Integer (int)", "java.lang.Integer (java.lang.String)" bei org.codehaus.janino.UnitCompiler.compileError (UnitCompiler.java:10174) bei org. codehaus.janino.UnitCompiler.findMostSpecificIInvocable (UnitCompiler.java:7559) bei org.codehaus.janino.UnitCompiler.invokeConstructor (UnitCompiler.java:6505) bei org.codehaus.janino.UnitCompiler.compileGet2 (UnitCompiler.java:4126) bei org.codehaus.janino.UnitCompiler.access $ 7600 (UnitCompiler.java:185) bei org.codehaus.janino.UnitCompiler $ 10.visitNewClassInstance (UnitCompiler.java:3275) bei org.codehaus.janino.Java $ NewClassInstance. accept (Java.java:4085) bei org.codehaus.janino.UnitCompiler.compileGet (UnitCompiler.java:3290) bei org.codehaus.j anino.UnitCompiler.compileGetValue (UnitCompiler.java:4368) bei org.codehaus.janino.UnitCompiler.compileGet2 (UnitCompiler.java:3571)

+0

116. Zeile ist endgültig java.lang.Integer deserializetoobject_value10 = deserializetoobject_resultIsNull3? null: new java.lang.Integer (deserializetoobject_argValue3); /* 117 */deserializetoobject_javaBean.setOffset (deserializetoobject_value10); /* 118 * / – user2221654

Verwandte Themen