2016-09-03 3 views
5

Wenn von Kafka-Streaming mit Spark-2.0, mir die folgenden Fehlermeldung erhalten:Nicht Serializable Ausnahme beim Lesen Kafka zeichnet mit Funken Streaming

org.apache.spark.SparkException: 
Job aborted due to stage failure: 
Task 0.0 in stage 1.0 (TID 1) had a not serializable result: 
org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
    - object not serializable (class: 
org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(
topic = mytopic, partition = 0, offset = 422337, 
CreateTime = 1472871209063, checksum = 2826679694, 
serialized key size = -1, serialized value size = 95874, 
key = null, value = <JSON GOES HERE...> 

ist hier der relevante Teil des Codes:

val ssc = new StreamingContext(sc, Seconds(2)) 

val topics = Array("ecfs") 
val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 

stream 
    .map(_.value()) 
    .flatMap(message => { 
    // parsing here... 
    }) 
    .foreachRDD(rdd => { 
    // processing here... 
    }) 

ssc.start() 

Von was ich sagen kann, ist es diese Linie, die das Problem verursacht .map(_.value()), wie kann das behoben werden?

Antwort

0

Sie können .map auf Dstream nicht verwenden: [String, String] wie Sie dort verwendet haben. Ich glaube, Sie verwandeln können und dann Karte gelten wie folgt

val streamed_rdd_final = streamed_rdd.transform{ rdd => rdd.map(x => x.split("\t")).map(x=>Array(check_time_to_send.toString,check_time_to_send_utc.toString,x(1),x(2),x(3),x(4),x(5))).map(x => x(1)+"\t"+x(2)+"\t"+x(3)+"\t"+x(4)+"\t"+x(5)+"\t"+x(6)+"\t"+x(7)+"\t")}

oder können Sie verwenden .map wie früher, sondern _.value tun() sollten Sie versuchen, eine Funktion in die Karte zu senden, wie ich tat unter

stream.map{case (x, y) => (y.toString)} 
Verwandte Themen