2016-04-21 9 views
0

Ich habe eine folgende Aufgabe vor mir.Spark und Cassandra Parallelverarbeitung

Der Benutzer stellt dem IP-Adresssatz eine Konfigurationsdatei zur Verfügung, während er den Spark Submit-Befehl ausführt.

können sagen, dass Array wie folgt aussieht:

val ips = Array(1,2,3,4,5)

Es kann bis in Array zu 100.000 Werte ..

Für alle Elemente in einem Array, sollte ich Daten für Cassandra lesen, führen etwas Berechnung und Einfügen von Daten zurück zu Cassandra.

Wenn ich tun:

ips.foreach(ip =>{ 
- read data from Casandra for specific "ip" // for each IP there is different amount of data to read (within the functions I determine start and end date for each IP) 
- process it 
- save it back to Cassandra}) 

dies gut funktioniert.

Ich glaube, dass der Prozess sequenziell abläuft; Ich nutze keine Parallelität.

Auf der anderen Seite, wenn ich tun:

val IPRdd = sc.parallelize(Array(1,2,3,4,5)) 
IPRdd.foreach(ip => { 
- read data from Cassandra // I need to use spark context to make the query 
-process it 
save it back to Cassandra}) 

I Serialisierung Ausnahme erhalten, weil Funken versucht Funken Zusammenhang serialisiert werden, die nicht serialisierbar ist.

Wie dies funktioniert, aber immer noch Parallelität ausnutzen.

Dank

Herausgegeben

Dies ist execption ich:

Exception in thread "main" org.apache.spark.SparkException: Aufgabe serializable nicht bei org. apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 304) bei org.apache.spark.util.ClosureCleaner $ .org $ apache $ funken $ util $ ClosureCleaner $$ sauber (ClosureCleaner.scala: 294) bei org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 122) bei org.apache.spark.SparkContext.clean (SparkContext.scala: 2055) bei org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply (RDD.scala: 919) bei org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply (RDD.scala: 918) bei org.apache.spark.rdd .RDDOperationScope $ .withScope (RDDOperationScope.scala: 150) unter org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 111) unter org.apache.spark.rdd.RDD.withScope (RDD.scala : 316) bei org.apache.spark.rdd.RDD.foreachPartition (RDD.scala: 918) bei com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply (WibeeeBatchJob.scala: 59) bei com.e nerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.Anwendung (WibeeeBatchJob.scala: 54) bei scala.collection.IndexedSeqOptimized $ class.foreach (IndexedSeqOptimized.scala: 33) bei scala.collection.mutable. ArrayOps $ ofRef.foreach (ArrayOps.scala: 108) bei com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $ .main (WibeeeBatchJob.scala: 54) bei com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob.main (WibeeeBatchJob.scala) bei sun.reflect.NativeMethodAccessorImpl.invoke0 (native Methode) bei sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) bei sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) bei java.lang.reflect.Method.invoke (Method.java:498) bei org.apache. spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain (SparkSubmit.scala: 731) bei org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1 (SparkSubmit.scala: 181) um org.apache.spark.ploy.SparkSubmit $ .submit (SparkSubmit.scala: 206) bei org.apache.spark.deploy.SparkSubmit $ .main (SparkSubmit.scala: 121) bei org.apache.spark.deploy. SparkSubmit.main (SparkSubmit.scala) verursacht durch: java.io.NotSerializableException: org.apache.spark.SparkContext Serialisierungsstapel: - Objekt nicht serialisierbar (Klasse: org.apache.spark.SparkContext, Wert: [email protected]) - Feld (Klasse: com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1, Name: sc $ 1, geben Sie ein: class org.apache.spark.SparkContext) - Objekt (Klasse com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,) - Feld (Klasse: com. enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ bewerben $ 1, name: $ outer, typ: class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1) - Objekt (Klasse com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ gilt $ 1,) bei org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala: 40) unter org.apache.spark.serializer.JavaSerializationS tream.writeObject (JavaSerializer.scala: 47) bei org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 101) bei org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 301)

+0

Was ist die spezifische Ausnahme und der spezifische Code, der das verursacht? –

Antwort

1

Am einfachsten ist es, die Spark Cassandra Connector zu verwenden, die Verbindungspooling und Serialisierung verarbeiten kann.

Damit Sie so etwas wie

sc.parallelize(inputData, numTasks) 
    .mapPartitions { it => 
    val con = CassandraConnection(yourConf) 
    con.withSessionDo{ session => 
     //Use the session 
    } 
    //Do any other processing 
    }.saveToCassandra("ks","table" 

Dies wäre vollständig manuelle Bedienung eines Cassandra Verbindung tun könnte. Die Sitzungen würden alle automatisch gepoolt und zwischengespeichert werden. Wenn Sie eine Anweisung vorbereiten, werden diese ebenfalls auf dem Executor zwischengespeichert. Wenn Sie mehr integrierte Methoden verwenden möchten, gibt es auch joinWithCassandraTable, die in Ihrer Situation funktionieren können.

sc.parallelize(inputData, numTasks) 
    .joinWithCassandraTable("ks","table") //Retrieves all records for which input data is the primary key 
    .map(//manipulate returned results if needed) 
    .saveToCassandra("ks","table") 
+0

Sie haben meine Frage nicht verstanden. Es war, ob ich zum Beispiel 100 verschiedene Abfragen (verschiedene Parameter der Abfragen) im Parallelmodus ausführen kann, so dass die Executoren derjenige sind, der die Abfrage durchführt (was bedeutet, dass sie die Instanz des Spark Context benötigen)) ... Aber ich erkannte, dass Funkenkontext nicht zu Executors geschickt werden kann und sollte, also muss ich die Struktur meiner Pipeline ändern. –

+0

Die Beispiele, die ich gab, haben beide die Executors, die die Fragen erledigen. – RussS

Verwandte Themen