2016-05-19 10 views
0

Während"Task nicht serializable" versucht JSON

import play.api.libs.json._ 

case class Person(name: String, lovesPandas: Boolean) 
implicit val personFormat = Json.format[Person] 

val text = """{"name":"Sparky The Bear", "lovesPandas":true}""" 

val jsonParse = Json.parse(text) 
val result = Json.fromJson[Person](jsonParse) 
result.get 

Arbeiten am Jupyter Notebook mit Apache Toree Kernel zu analysieren,

import org.apache.spark._ 
import play.api.libs.json._ 
import play.api.libs.functional.syntax._ 

case class Person(name: String, lovesPandas: Boolean) 
implicit val personReads = Json.format[Person] 

val text = """{"name":"Sparky The Bear", "lovesPandas":true}""" 

val input = sc.parallelize(List(text)) 
val parsed = input.map(Json.parse(_)) 
val result = parsed.flatMap(record => {  
    personReads.reads(record).asOpt 
}) 
result.filter(_.lovesPandas).map(Json.toJson(_)).saveAsTextFile("files/out/pandainfo.json") 

kehrt

Name: org.apache.spark.SparkException 
Message: Task not serializable 
StackTrace: org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
[...] 

selbst sagte, obwohl Beispiel ist abgeleitet von https://github.com/databricks/learning-spark/blob/master/src/main/scala/com/oreilly/learningsparkexamples/scala/BasicParseJson.scala

Ich verstehe, dass Objekte, die an die anderen Knoten übergeben werden, serialisiert werden müssen und dies scheint nicht möglich zu sein. Ist also etwas falsch mit dem Beispiel oder mache ich etwas falsch? Wie behebe ich das?


Durch die Art und Weise

import org.apache.spark._ 
import play.api.libs.json._ 
import play.api.libs.functional.syntax._ 

val text = """{"name":"Sparky The Bear", "lovesPandas":true}""" 

case class Person(name: String, lovesPandas: Boolean) 

val input = sc.parallelize(List(text)) 
val parsed = input.map(Json.parse(_)) 
val result = parsed.flatMap(record => { 
    implicit val personReads = Json.format[Person] 
    personReads.reads(record).asOpt 
}) 
result.collect 

führt in

Name: org.apache.spark.SparkException 
Message: Job aborted due to stage failure: Task 3.0 in stage 0.0 (TID 3) had a not serializable result: play.api.libs.json.OFormat$$anon$1 
Serialization stack: 
    - object not serializable (class: play.api.libs.json.OFormat$$anon$1, value: 
[...] 

ich result.collect, wenn dieser Teil des Codes zu testen korrekt ist.

Zusätzlich, wenn ich

schreiben
result. filter(_.lovesPandas).map{Json.toJson(_)}.saveAsTextFile("files/out/pandainfo.json") 

statt result.collect ich

Name: Compile Error 
Message: <console>:166: error: No Json serializer found for type Person. Try to implement an implicit Writes or Format for this type. 
         Json.toJson(_) 
           ^
StackTrace: 

so denke, ich habe Person zu erklären Serializable zu sein. Allerdings Zugabe extends Serializable es am Ende keine Wirkung, während with Serializable den Fehler

Name: Compile Error 
Message: <console>:2: error: ';' expected but 'with' found. 
     case class Person(name: String, lovesPandas: Boolean) with Serializable 
                  ^
+0

'with' für die Erweiterung mehrere Züge ist ... Sie müssen' erweitert Serializable' –

+0

Ich bin nicht sicher, Spark serializable hat alles viel zu tun mit Serializable von Java, so dass das Hinzufügen von "extends Serializable" nichts bewirkt. Funktioniert der genaue Code auf der Beispielseite, auf den Sie verwiesen haben, auch für Sie? (d. h. ohne irgendwelche Ihrer Änderungen)? –

+0

@TheArchetypalPaul Spark hat kein eigenes serialisierbares Attribut. Es sucht nach dem serialisierbaren Java, wenn es seinen CloserCleaner ausführt. –

Antwort

0

wirft werde ich nach Gefühl gehen und sagen, dass der Rückgabewert von Json.format nicht serialisierbar ist.

um es zu bekommen, Sie den Wert innerhalb flatMap erklären kann:

val result = parsed.flatMap(record => {  
    val personReads = Json.format[Person] 
    val jsValue = Json.parse(record) 
    personReads.reads(jsValue).asOpt 
}) 

bearbeiten

Ich denke, was Probleme verursacht, ist die Tatsache, dass Json.parse kehrt JsValue die nicht serialisierbar ist.

Sie können diese einzuengen auf einen einzigen map:

sc 
    .parallelize(List(text)) 
    .map(record => { 
    val personReads = Json.format[Person] 
    val jsValue = Json.parse(record) 
    personReads.reads(jsValue).asOpt 
    }) 
.filter(_.lovesPandas) 
.map(Json.toJson(_).toString) 
.saveAsTextFile("files/out/pandainfo.json") 
+0

Ich habe versucht (siehe Update). – Make42

+0

@ Make42 Können Sie den vollständigen Stack-Trace mit dem Fehler msg posten, wenn Sie diesen Code ausführen? Ich möchte die Objekthierarchie sehen, die Spark ausgibt. –

+0

@ Make42 Siehe mein update –

Verwandte Themen