2016-05-30 12 views
1

Ich versuche, Elemente mit Hilfe von SparkStreaming (Sammeln von Nachrichten aus einer Kafka-Warteschlange) zu TitanDB hinzuzufügen. Aber es scheint, dass es schwerer als erwartet ist. Hier ist die Definition der Titan-Verbindung:Einfügen von Daten in TitanDB mithilfe von Spark (oder SparkStreaming)

val confPath: String = "titan-cassandra-es-spark.properties" 
val conn: TitanModule = new TitanModule(confPath) 

Titan-Modul ist eine Serializable Klasse, die die TitanDB Verbindung zu konfigurieren:

... 
val configurationFilePath: String = confFilePath 
val configuration = new PropertiesConfiguration(configurationFilePath) 
val gConn: TitanGraph = TitanFactory.open(configuration) 
... 

Wenn ich den sparkStreaming Job auszuführen, die Nachrichten (json) von einem Kafka sammeln Queue, empfängt sie die Nachricht und versucht, sie in TitanDB hinzuzufügen, explodiert sie mit dem folgenden StackTrace.

Wissen Sie, ob das Hinzufügen von Daten in TitanDB mit SparkStreaming möglich ist? Wissen Sie, was könnte die Lösung dafür sein?

18:03:50,596 ERROR JobScheduler:95 - Error running job streaming job 1464624230000 ms.0 
org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
     at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 
     at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:200) 
     at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:132) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
     at scala.util.Try$.apply(Try.scala:161) 
     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.commons.configuration.PropertiesConfiguration 
Serialization stack: 
     - object not serializable (class: org.apache.commons.configuration.PropertiesConfiguration, value: [email protected]8) 
     - field (class: salvob.TitanModule, name: configuration, type: class org.apache.commons.configuration.PropertiesConfiguration) 
     - object (class salvob.TitanModule, [email protected]) 
     - field (class: salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, name: conn$1, type: class salvob.TitanModule) 
     - object (class salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, <function1>) 
     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
     ... 28 more 
Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
     at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
     at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
     at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) 
     at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
     at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
     at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
     at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 
     at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:200) 
     at salvob.SparkConsumer$$anonfun$main$1.apply(SparkConsumer.scala:132) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) 
     at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
     at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) 
     at scala.util.Try$.apply(Try.scala:161) 
     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) 
     at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
     at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.io.NotSerializableException: org.apache.commons.configuration.PropertiesConfiguration 
Serialization stack: 
     - object not serializable (class: org.apache.commons.configuration.PropertiesConfiguration, value: [email protected]8) 
     - field (class: salvob.TitanModule, name: configuration, type: class org.apache.commons.configuration.PropertiesConfiguration) 
     - object (class salvob.TitanModule, [email protected]) 
     - field (class: salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, name: conn$1, type: class salvob.TitanModule) 
     - object (class salvob.SparkConsumer$$anonfun$main$1$$anonfun$apply$3, <function1>) 
     at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) 
     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) 
     at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) 
     ... 28 more 

Antwort

1

Spark Streaming produziert RDDs. Die Verarbeitung der Daten in den RDDs erfolgt auf den Worker-Knoten. Der Code, den Sie in rdd.map() schreiben, wird zusammen mit den Objekten, auf die in diesem Block verwiesen wird, serialisiert und zur Verarbeitung an den Worker-Knoten gesendet.

So ideale Weise die Grafik Beispiel durch Funken zu verwenden, ist die folgende:

streamRdd.map(kafkaTuple => { 
    // create graph instance 
    // use graph instance to add/modify graph 
    // close graph instance 
}) 

Aber das wird eine neue Diagramm-Instanz für jede Zeile erstellen. Als Optimierung können Sie die grafische Darstellung Instanz pro Instanz

rdd.foreachPartition((rddRows: Iterator[kafkaTuple]) => { 
     val graph: TitanGraph = // create titan instance 
     val trans: TitanTransaction = graph.newTransaction() 

     rddRows.foreach(graphVertex => { 
     // do graph insertion in the above transaction 
     }) 

     createVertexTrans.commit() 
     graph.close() 
}) 

graph.newTransaction() Hier hilft in Multi-Threaded Graph Updates erstellen. Andernfalls erhalten Sie Sperrenausnahmen.

Einzige Sache ist, dass, je nachdem, was ich bisher gelesen habe, gibt es keine direkte Unterstützung für Multi-Knoten-Update. Von dem, was ich gesehen habe, aktualisiert Titan Transaction HBase mit einer Sperre, wenn es versucht, einen Scheitelpunkt zu ändern. Daher schlagen andere Partitionen fehl, wenn sie versuchen, Aktualisierungen vorzunehmen. Sie müssen einen externen Synchronisationsmechanismus erstellen oder Ihre rdd in eine einzelne Partition partitionieren und dann den obigen Code verwenden, um Aktualisierungen durchzuführen.

+0

danke für die Antwort. Ich verwende nicht HBase, aber ich denke, das ist ein Problem, was auch immer der Speicher ist, den Titan benutzt (ich benutze Cassandra) – salvob

+0

Ja. Speicherwahl sollte nicht wichtig sein. Konnten Sie Insertionen über mehrere Knoten simultan durchführen? –

0

Stellen Sie sicher, dass alle Klassen, die an andere untergeordnete Maschinen weitergegeben werden können, serialisierbar sind. Es ist ziemlich wichtig. Initialisieren Sie keine Variablen außerhalb dieser übergebenen Klassen.

Ich habe Apache Spark (nicht Streaming) verwendet und es hat gut funktioniert. Es war nicht ganz einfach, es richtig zu machen, da Titan eine Version von Spark verwendet. Also würde es einige Abhängigkeitskonflikte geben. Dies ist die einzige Version, die funktioniert

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.10</artifactId> 
    <version>1.2.2</version> 
</dependency> 

So habe ich den Cluster gestartet.

analysieren dann die Daten

JavaRDD<T> javaRDD = initRetriever(); // init JavaRDD 
javaRDD.foreachPartition(iter->{ 
    Graph graph= initGraph(); 
    Parser<T> parser= initParser(graph); 
    while(iter.hasNext()){ 
     try { 
      parser.parse(iter); // extends serializable ! 
     } catch (Exception e) { 
      logger.error("Failed in importing all vertices ", e); 
      graph.tx().rollback(); 
     } 
    } 
    graph.tx().commit(); 
}); 

ich dieses Modul auf Github zu lösen möglicherweise in der Lage, wenn es notwendig ist.

+0

Vielen Dank für Ihre Antwort. Ich benutze Java nicht, aber Scala, aber das sollte kein Problem sein. Was ist deine Titanversion? In Ihrem Code sehe ich jedoch nicht, wo die Eröffnungskonfiguration von TitanDB ist. Der Github Repo könnte nützlich sein, wenn Sie das nicht stört. – salvob

+0

Es ist Titan 1.0. Titan DB-Konfiguration ist nur die Standard-Version mit seiner Version ausgeliefert. Der Code ist nicht ganz meinerseits, ich werde das mit verbundenen Partnern besprechen und diese Antwort aktualisieren, wenn ich sie veröffentliche. –

+0

danke. Haben Sie Spark zum Einfügen oder für Traversal verwendet? Ich bin mir bewusst, dass es schwerer ist einzufügen als das Diagramm – salvob

Verwandte Themen