2016-08-11 4 views
0

Ich habe über Broadcast-Variablen in Spark gelernt, also habe ich versucht, es zu nutzen. Ich verwende Spark-Shell (Version 1.6.0). Im Anschluss ist mein Code:Objekt nicht serialisierbar org.apache.spark.SparkContext

scala> val pageurls = sc.parallelize(List(("www.google.com","Google"),("www.yahoo.com","Yahoo")) 
pageurls: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[0] at parallelize at <console>:27 
    scala> val pageCounts = sc.parallelize(List(("www.google.com",90),("www.yahoo.com",10))) 
    pageCounts: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at parallelize at <console>:27 
    scala> val pageMaps = pageurls.collectAsMap 
    pageMaps: scala.collection.Map[String,String] = Map(www.yahoo.com -> Yahoo, www.google.com -> Google) 
    scala> val bMaps = sc.broadcast(pageMaps) 
    bMaps: org.apache.spark.broadcast.Broadcast[scala.collection.Map[String,String]] = Broadcast(2) 
    scala> bMaps.value 
    res0: scala.collection.Map[String,String] = Map(www.yahoo.com -> Yahoo, www.google.com -> Google) 
    scala> val newRdd = pageCounts.map{ 
| case (url,count) => (url,bMaps.value(url),count)} 
    newRdd: org.apache.spark.rdd.RDD[(String, String, Int)] = MapPartitionsRDD[2] at map at <console>:35 
    scala> newRdd.collect 
    res1: Array[(String, String, Int)] = Array((www.google.com,Google,90), (www.yahoo.com,Yahoo,10)) 

Der Code funktionierte gut, wenn ich Funken Shell ausgeführt und den Standard SparkContext sc verwenden, die erstellt werden, wenn Funken Shell aufgerufen wird. Ich habe jedoch meinen eigenen SparkContext erstellt und versucht, dieselbe Code-Sequenz auszuführen. Vor meiner eigenen Kontext zu schaffen, halte ich die SparkContext mit sc.stop erstellt Standard

sc.stop 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
val conf = new SparkConf().setMaster("local").setAppName("MyApp") 
val sc = new SparkContext(conf) 

Als ich SparkContext wie diese und verwenden Broadcasts Variable zu erstellen, ich die folgende Ausnahme erhalten: org.apache.spark.SparkException: Aufgabe nicht serializable

verursacht durch: java.io.NotSerializableException: org.apache.spark.SparkConf

Warum das so ist es passiert und was soll ich tun, damit ich diese Fehler nicht bekommen Alles, was ich? fehlt?

Antwort

0

Wenn Sie die Spark-Shell starten, erstellt Spark-Shell funconcontext [sc] für Sie. Ein JVM kann nur eine Funkenschale haben. Sie versuchen, eine andere Spark-Shell in demselben jvm zu erstellen. Es scheint, dass die Version von spark Sie eingeschaltet sind, sparkConf wirft die Ausnahme der Klasse, die nicht serialisierbar ist. Um zu vermeiden, diese Ausnahme Gebrauch:

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
val conf = new SparkConf() 
conf.setAppName("MyApp") 
conf.set("spark.driver.allowMultipleContexts", "true") 
conf.setMaster("local") 
val sc = new SparkContext(conf) 

Referenzen: a] Multiple SparkContext detected in the same JVM

b] https://issues.apache.org/jira/browse/SPARK-2243

bearbeiten Lösung 1: Make-Funktion für den Rundfunk variabel und nennen es von Shell:

sc.stop 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
val conf = new SparkConf().setMaster("local").setAppName("MyApp") 
val sc = new SparkContext(conf) 
val pageurls = sc.parallelize(List(("www.google.com","Google"), ("www.yahoo.com","Yahoo"))) 
val pageCounts = sc.parallelize(List(("www.google.com",90),("www.yahoo.com",10))) 
val pageMaps = pageurls.collectAsMap 
object Test{ 
def bVar(sc:SparkContext, pageMaps: scala.collection.Map[String, String]) = { 
    val bMaps = sc.broadcast(pageMaps) 
    bMaps.value 
    val newRdd = pageCounts.map{case (url,count) => (url,bMaps.value(url),count)} 
    newRdd.collect 
}} 
val result = Test.bVar(sc, pageMaps) 
result: Array[(String, String, Int)] = Array((www.google.com,Google,90), (www.yahoo.com,Yahoo,10)) 

Referenz: Spark Accumulator throws "Task not serializable" error

Lösung 2: Wenn Sie darauf bestehen, keine Funktion aus der Shell zu verwenden, machen Sie Sparkcontext und Sparkconf als Transienten.

sc.stop 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
@transient val conf = new SparkConf().setMaster("local").setAppName("MyApp") 
@transient val sc = new SparkContext(conf) 
val pageurls = sc.parallelize(List(("www.google.com","Google"), ("www.yahoo.com","Yahoo"))) 
val pageCounts = sc.parallelize(List(("www.google.com",90),("www.yahoo.com",10))) 
val pageMaps = pageurls.collectAsMap 
val bMaps = sc.broadcast(pageMaps) 
bMaps.value 
val newRdd = pageCounts.map{case (url,count) => (url,bMaps.value(url),count)} 
newRdd.collect 
res3: Array[(String, String, Int)] = Array((www.google.com,Google,90), (www.yahoo.com,Yahoo,10)) 

Referenz: Should I leave the variable as transient?

http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/

+0

habe ich vergessen zu erwähnen, dass, bevor ich meinen eigenen Kontext schaffen, ich den Standardkontext durch Funken (sc.stop) erstellt stoppen. – user2430771

+0

OKAY. Könnten Sie bitte die Ausgabe von println (sc.getConf.toDebugString) posten, bevor Sie den bestehenden Sparkconnext stoppen und einen neuen Sparkcontext erstellen? – hadooper

Verwandte Themen