Ich habe eine folgende Aufgabe vor mir.Spark und Cassandra Parallelverarbeitung
Der Benutzer stellt dem IP-Adresssatz eine Konfigurationsdatei zur Verfügung, während er den Spark Submit-Befehl ausführt.
können sagen, dass Array wie folgt aussieht:
val ips = Array(1,2,3,4,5)
Es kann bis in Array zu 100.000 Werte ..
Für alle Elemente in einem Array, sollte ich Daten für Cassandra lesen, führen etwas Berechnung und Einfügen von Daten zurück zu Cassandra.
Wenn ich tun:
ips.foreach(ip =>{
- read data from Casandra for specific "ip" // for each IP there is different amount of data to read (within the functions I determine start and end date for each IP)
- process it
- save it back to Cassandra})
dies gut funktioniert.
Ich glaube, dass der Prozess sequenziell abläuft; Ich nutze keine Parallelität.
Auf der anderen Seite, wenn ich tun:
val IPRdd = sc.parallelize(Array(1,2,3,4,5))
IPRdd.foreach(ip => {
- read data from Cassandra // I need to use spark context to make the query
-process it
save it back to Cassandra})
I Serialisierung Ausnahme erhalten, weil Funken versucht Funken Zusammenhang serialisiert werden, die nicht serialisierbar ist.
Wie dies funktioniert, aber immer noch Parallelität ausnutzen.
Dank
Herausgegeben
Dies ist execption ich:
Exception in thread "main" org.apache.spark.SparkException: Aufgabe serializable nicht bei org. apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 304) bei org.apache.spark.util.ClosureCleaner $ .org $ apache $ funken $ util $ ClosureCleaner $$ sauber (ClosureCleaner.scala: 294) bei org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 122) bei org.apache.spark.SparkContext.clean (SparkContext.scala: 2055) bei org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply (RDD.scala: 919) bei org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply (RDD.scala: 918) bei org.apache.spark.rdd .RDDOperationScope $ .withScope (RDDOperationScope.scala: 150) unter org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 111) unter org.apache.spark.rdd.RDD.withScope (RDD.scala : 316) bei org.apache.spark.rdd.RDD.foreachPartition (RDD.scala: 918) bei com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply (WibeeeBatchJob.scala: 59) bei com.e nerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.Anwendung (WibeeeBatchJob.scala: 54) bei scala.collection.IndexedSeqOptimized $ class.foreach (IndexedSeqOptimized.scala: 33) bei scala.collection.mutable. ArrayOps $ ofRef.foreach (ArrayOps.scala: 108) bei com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $ .main (WibeeeBatchJob.scala: 54) bei com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob.main (WibeeeBatchJob.scala) bei sun.reflect.NativeMethodAccessorImpl.invoke0 (native Methode) bei sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) bei sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) bei java.lang.reflect.Method.invoke (Method.java:498) bei org.apache. spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain (SparkSubmit.scala: 731) bei org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1 (SparkSubmit.scala: 181) um org.apache.spark.ploy.SparkSubmit $ .submit (SparkSubmit.scala: 206) bei org.apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 121) bei org.apache.spark.deploy. SparkSubmit.main (SparkSubmit.scala) verursacht durch: java.io.NotSerializableException: org.apache.spark.SparkContext Serialisierungsstapel: - Objekt nicht serialisierbar (Klasse: org.apache.spark.SparkContext, Wert: [email protected]) - Feld (Klasse: com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1, Name: sc $ 1, geben Sie ein: class org.apache.spark.SparkContext) - Objekt (Klasse com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,) - Feld (Klasse: com. enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ bewerben $ 1, name: $ outer, typ: class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1) - Objekt (Klasse com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ gilt $ 1,) bei org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala: 40) unter org.apache.spark.serializer.JavaSerializationS tream.writeObject (JavaSerializer.scala: 47) bei org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 101) bei org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 301)
Was ist die spezifische Ausnahme und der spezifische Code, der das verursacht? –