IN Code unten ich versuche, avro Nachricht von einem kafka Thema, und innerhalb der Karte Methode, wo ich KafkaAvroDecoder fromBytes Methode zu lesen, scheint es nicht die Aufgabe serializable Ausnahme zu verursachen , wie entschlüssele ich die Avro-Nachrichten?Funken strukturiert Streaming (Java): Aufgabe nicht serializable
public static void main (String [] args) Exception wirft {
Properties decoderProps = new Properties();
decoderProps.put("schema.registry.url", SCHEMA_REG_URL);
//decoderProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
KafkaAvroDecoder decoder = new KafkaAvroDecoder(new VerifiableProperties(decoderProps));
SparkSession spark = SparkSession
.builder()
.appName("JavaCount1").master("local[2]")
.config("spark.driver.extraJavaOptions", "-Xss4M")
.getOrCreate();
Dataset<Row> ds1 = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", HOSTS)
.option("subscribe", "systemDec200Message")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 1)
.load();
Dataset<String> ds2 = ds1.map(m-> {
GenericData.Record data = (GenericData.Record)decoder.fromBytes((byte[]) m.get(1));
return "sddasdadasdsadas";
}, Encoders.STRING());
StreamingQuery query = ds2.writeStream()
.outputMode("append")
.format("console")
.trigger(ProcessingTime.apply(15))
.start();
query.awaitTermination();
}
i wie unten die Ausnahme erhalten,
17.04.12 16.51.06 INFO Codegenerator: Code erzeugt in 329.145119 ms 17.04.12 16.51.07 eRROR StreamExecution: Abfrage [id = 1d56386c-3fba-4978-8565-6b9c880d4fce, RunID = b7bbb8d8-b52d-4c14-9dec-bc9cb41f8d77] mit Fehlern beendet org.apache.spark .SparkException: Task nicht serialisierbar bei org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 298) bei org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean (ClosureCleaner.scala: 288) unter org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 108) unter org.apache.spark.SparkContext.clean (SparkContext.scala: 2094) unter org.apache. spark.rdd.RDD $$ anonfun $ mapPartitionsWithIndex $ 1.Anwendung (RDD.scala: 840) bei org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsWithIndex $ 1.Anwendung (RDD.scala: 839) bei org.apache. spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151) bei org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 112) an org.apache.spark.rdd.RDD.withScope (RDD. Scala: 362) bei org.apache.spark.rdd.RDD.mapPartitionsWithIndex (RDD.scala: 839) bei org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute (WholeStageCodegenExec.scala: 371) bei org.apache.spark .sql.execution.SparkPlan $$ anonfun $ führen Sie $ 1.apply (SparkPlan.scala: 114) unter org.apache.spark.sql.execution.SparkPlan $$ anonfun $ aus, führen Sie $ 1.apply (SparkPlan.scala: 114) bei org .apache.spark.sql.execution.SparkPlan $$ anonfun $ executeQuery $ 1.apply (SparkPlan.scala: 135) unter org.a pache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 151)
Für 'Aufgabe nicht serializable', ist es der stacktrace Teil, der Ihnen sagt, genau was serialisiert wird –