2017-12-21 4 views
0

Ich habe benutzerdefinierte KeyedDeserializationSchema kafka Nachrichten deserialisieren und es wie folgt ein:Flink wirft java.io.NotSerializableException

object Job { 
    case class KafkaMsg[K, V](
    key: K, value: V, topic: String, partiton: Int, offset: Long) 

    trait Deser[A] { 
    def deser(a: Array[Byte]): A 
    } 

    object Deser { 

    def apply[A](implicit sh: Deser[A]): Deser[A] = sh 
    def deser[A: Deser](a: Array[Byte]) = Deser[A].deser(a) 

    implicit val stringDeser: Deser[String] = 
     new Deser[String] { 
     def deser(a: Array[Byte]): String = "" 
     } 

    implicit val longDeser: Deser[Long] = 
     new Deser[Long] { 
     def deser(a: Array[Byte]): Long = 0 
     } 
    } 

    class TypedKeyedDeserializationSchema[ 
    K: Deser: TypeInformation, 
    V: Deser: TypeInformation 
    ] extends KeyedDeserializationSchema[KafkaMsg[K, V]] { 

    def deserialize(key: Array[Byte], 
        value: Array[Byte], 
        topic: String, 
        partition: Int, 
        offset: Long 
    ): KafkaMsg[K, V] = 
     KafkaMsg(Deser[K].deser(key), 
       Deser[V].deser(value), 
       topic, 
       partition, 
       offset 
    ) 

    def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false 

    def getProducedType(): TypeInformation[KafkaMsg[K, V]] = 
     createTypeInformation 
    } 

    def main(args: Array[String]) { 
    val properties = new Properties 
    properties.setProperty("bootstrap.servers", "localhost:9092") 
    properties.setProperty("group.id", "flink-test") 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 

    val stream = env 
     .addSource(new FlinkKafkaConsumer011(
        "topic", 
        new TypedKeyedDeserializationSchema[String, Long], 
        properties 
        )) 
     .print 

    env.execute("Flink Scala API Skeleton") 
    } 
} 

Was mich gibt:

[error] Caused by: java.io.NotSerializableException: l7.Job$Deser$$anon$7 
[error]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
[error]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
[error]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
[error]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
[error]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
[error]   at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
[error]   at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
[error]   at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
[error]   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
[error]   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
[error]   at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315) 
[error]   at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170) 
[error]   at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164) 
[error]   at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:670) 
[error]   at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:600) 
[error]   at l7.Job$.main(Job.scala:89) 
[error]   at l7.Job.main(Job.scala) 

Das Problem in meinem Deser Typ ist offensichtlich Klasse wie Implementierung, aber ich verstehe nicht, was genau diesen Fehler verursacht oder wie es zu beheben.

Antwort

1

Ja, der Grund für diesen Fehler ist, dass Ihre Deser im Gegensatz zu TypeInformation nicht erweitert/implementieren Serializable. Um herauszufinden, warum dies passiert, können Sie sich zunächst eine Frage stellen: Warum muss ich implicit val stringDeser und implicit val longDeser deklarieren?

Die Antwort ist, was Scala Compiler tut, wenn es eine generische Constraint in Form von K: Deser: TypeInformation sieht. Und was es tut, ist, dass es es mit implicit Beweisobjekte umschreibt. So können Sie Code in etwa wie folgt umgewandelt:

class TypedKeyedDeserializationSchema[K, V](implicit val kDeserEv: Deser[K], 
              val kTypeInfoEn: TypeInformation[K], 
              val vDeserEv: Deser[V], 
              val vTypeInfoEn: TypeInformation[V]) extends KeyedDeserializationSchema[KafkaMsg[K, V]] { 

    def deserialize(key: Array[Byte], 
        value: Array[Byte], 
        topic: String, 
        partition: Int, 
        offset: Long 
       ): KafkaMsg[K, V] = 
    KafkaMsg(kDeserEv.deser(key), 
     vDeserEv.deser(value), 
     topic, 
     partition, 
     offset 
    ) 

    def isEndOfStream(e: KafkaMsg[K, V]): Boolean = false 

    def getProducedType(): TypeInformation[KafkaMsg[K, V]] = createTypeInformation 
} 

Nun ist es offensichtlich, dass das Objekt vom Typ TypedKeyedDeserializationSchema[String,Long] tatsächlich enthält zwei Felder vom Typ Deser[String] und Deser[Long] mit den Werten aus dem implicit val s Sie oben erklärt. Wenn der Flink also versucht, sicherzustellen, dass die übergebene Funktion Serializable ist, schlägt die Prüfung fehl.

Nun ist die Lösung liegt auf der Hand: Ihre Eigenschaft erweitern machen Deser[A]Serializable

trait Deser[A] extends Serializable { 
    def deser(a: Array[Byte]): A 
} 
Verwandte Themen