2017-03-20 4 views
0

Ich bin sehr neu zu Kafka und SparkStreaming. Dieser Sparkstreaming-Consumer liest aus Kafka (Schlüssel ist Zeichenfolge und Wert ist Zeichenfolge). Wenn die Daten keine UTF8-Zeichenfolge mit Escapezeichen enthalten, funktioniert sie einwandfrei. Aber wenn es der Fall ist, schlägt es mit der folgenden Fehlermeldung:MalformedInputException Fehler von SparkStream App Lesen von Kafka Thema

java.nio.charset.MalformedInputException: Input length = 1 
at java.nio.charset.CoderResult.throwException(CoderResult.java:281) 
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) 
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) 
at java.io.InputStreamReader.read(InputStreamReader.java:184) 
at java.io.BufferedReader.fill(BufferedReader.java:161) 
at java.io.BufferedReader.readLine(BufferedReader.java:324) 
at java.io.BufferedReader.readLine(BufferedReader.java:389) 
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72) 
at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:172) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) 
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) 
at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) 
at scala.collection.AbstractIterator.to(Iterator.scala:1336) 
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) 
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) 
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) 
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) 
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935) 
at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:935) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:99) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

Hier ist ein Beispiel dieser Zeilen mit entkam UTF8-String (beachten Sie, dass es zwei Schrägstriche enthält):

Los m\\xC3\\xA1s vendidos en ... 

Zunächst vermutete ich Kafka Streaming-Encoding-Konfiguration, aber die Änderung hat nicht geholfen. Am Ende haben wir festgestellt, dass der Fehler auftritt, wenn collect mit den von Python UDF zurückgegebenen Daten ausgeführt wird.

Wenn mehr Informationen benötigt werden, bitte Kommentar. Vielen Dank im Voraus!

Antwort

0

Es stellte sich heraus, dass unsere JVM-Codierung auf der Datenerfassungsseite (aus Python-UDF) auf ISO8859-1 gesetzt wurde. Also haben wir das gelöst, indem wir utf-8 in der Aufruf-Pipe angegeben haben:

Verwandte Themen