2017-08-25 2 views
1

Wir entwickeln eine Spark-Streaming-ETL-Anwendung, die die Daten von Kafka bezieht, notwendige Transformationen anwendet und die Daten in MongoDB lädt. Die von Kafka erhaltenen Daten sind im JSON-Format. Die Transformationen werden auf jedes Element (JSON-String) der RDD basierend auf den von MongoDB abgerufenen Suchdaten angewendet. Da sich die Suchdaten ändern, muss ich sie für jedes Batch-Intervall abrufen. Die Suchdaten werden mit SqlContext.read von MongoDB gelesen. Ich konnte SqlContext.read nicht in DStream.transform verwenden, da der SqlContext nicht serialisierbar ist, sodass ich ihn nicht an die Worker-Knoten senden kann. Jetzt versuche ich mit DStream.foreREDD, in dem ich die Daten von MongoDB hole und die Nachschlagedaten an die Arbeiter ausstrahle. Alle Transformationen auf den RDD-Elementen werden innerhalb der Schließung von rdd.map ausgeführt, die die übertragenen Daten verwendet und Transformationen durchführt und eine RDD zurückgibt. Die RDD wird dann in einen Datenrahmen konvertiert und in MongoDB geschrieben. Derzeit läuft diese Anwendung sehr langsam.DStream RDD mit externen Daten transformieren

PS: Wenn ich den Teil des Codes verschiebe, der die Suchdaten aus DStream.foreachRDD holt und DStream.transform hinzufügt, um Transformationen anzuwenden, und DStream.foreRADD nur die Daten in MongoDB einfügen, ist die Leistung sehr gut. Bei diesem Ansatz werden die Suchdaten jedoch nicht für jedes Stapelintervall aktualisiert.

Ich suche Hilfe, um zu verstehen, ob dies ein guter Ansatz ist, und ich suche nach einer Anleitung zur Verbesserung der Leistung.

Es folgt eine Pseudocode

package com.testing 


object Pseudo_Code { 
    def main(args: Array[String]) { 

    val sparkConf = new SparkConf().setAppName("Pseudo_Code") 
     .setMaster("local[4]") 

    val sc = new SparkContext(sparkConf) 
    sc.setLogLevel("ERROR") 

    val sqlContext = new SQLContext(sc) 
    val ssc = new StreamingContext(sc, Seconds(1)) 

    val mongoIP = "127.0.0.1:27017" 

    val DBConnectionURI = "mongodb://" + mongoIP + "/" + "DBName" 

    val bootstrap_server_config = "127.0.0.100:9092" 
    val zkQuorum = "127.0.0.101:2181" 

    val group = "streaming" 

    val TopicMap = Map("sampleTopic" -> 1) 


    val KafkaDStream = KafkaUtils.createStream(ssc, zkQuorum, group, TopicMap).map(_._2) 

    KafkaDStream.foreachRDD{rdd => 
     if (rdd.count() > 0) { 

     //This lookup data has information required to perform transformation 
     //This information keeps changing, so the data should be fetched for every batch interval 

     val lookup1 = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource") 
     .option("spark.mongodb.input.uri", DBConnectionURI) 
     .option("spark.mongodb.input.collection", "lookupCollection1") 
     .load() 

     val broadcastLkp1 = sc.broadcast(lookup1) 

     val new_rdd = rdd.map{elem => 
     val json: JValue = parse(elem) 

     //Foreach element in rdd, there are some values that should be looked up from the broadcasted lookup data 
     //"value" extracted from json 
     val param1 = broadcastLkp1.value.filter(broadcastLkp1.value("key")==="value").select("param1").distinct() 
     val param1ReplaceNull = if(param1.count() == 0){ 
            "constant" 
           } 
           else{ 
            param1.head().getString(0) 
           } 
     //create a new JSON with a different structure 
     val new_JSON = """""" 

     compact(render(new_JSON)) 
    } 

    val targetSchema = new StructType(Array(StructField("key1",StringType,true) 
                ,StructField("key2",TimestampType,true))) 
    val transformedDf = sqlContext.read.schema(targetSchema).json(new_rdd) 


    transformedDf.write 
      .option("spark.mongodb.output.uri",DBConnectionURI) 
      .option("collection", "tagetCollectionName") 
      .mode("append").format("com.mongodb.spark.sql").save() 
     } 
    } 

    // Run the streaming job 
    ssc.start() 
    ssc.awaitTermination() 
    } 




} 
+0

Sie haben hier ein interessantes Problem. Dinge, die du beachten solltest: Kannst du sowohl von deinem Kafka als auch von deiner MongoDB streamen? Wenn dies der Fall ist, könnten Sie beide DStreams gleichzeitig bearbeiten. –

+0

@MichelLemay Haben Sie ein Beispiel, wie Sie von mongoDB streamen können. Ich kann es versuchen. Im Moment konnte ich mich ein wenig vorwärts bewegen, indem ich einige Anweisungen in https://stackoverflow.com/questions/37638519/spark-streaming-how-to-periodically-refresh-cached-rdd befolgte. Ich habe eine DStream.foreachRDD erstellt, in der ich die Nachschlagedaten neu lade, dann DStream.transform, wo die Nachschlagedaten verwendet werden und eine neue RDD zurückgibt, dann eine weitere foreachRDD, um Daten in mongoDB einzufügen. Das funktioniert, aber die Leistung ist sehr schlecht. – Sid

+0

haben Sie versucht, die Funktion "from_json" in der Datenframe-API für Ihre json-Transformationen verfügbar zu machen? Sie könnten strukturiertes Streaming versuchen (wenn Ihr Treiber .writeStream unterstützt). val msgSchema = Encoders.product [Nachricht] .schema val ds = df .select (von_json ($ "value" .cast ("string"), msgSchema) .as [Nachricht]) – sgireddy

Antwort

0

Bei der Erforschung, dass die Lösung gearbeitet ist der gesendete Datenrahmen cachen, nachdem sie von den Arbeitern gelesen wird. Im Folgenden ist die Codeänderung, die ich tun musste, um die Leistung zu verbessern.

val new_rdd = rdd.map{elem => 
     val json: JValue = parse(elem) 

     //Foreach element in rdd, there are some values that should be looked up from the broadcasted lookup data 
     //"value" extracted from json 
     val lkp_bd = broadcastLkp1.value 
     lkp_bd.cache() 
     val param1 = lkp_bd.filter(broadcastLkp1.value("key")==="value").select("param1").distinct() 
     val param1ReplaceNull = if(param1.count() == 0){ 
            "constant" 
           } 
           else{ 
            param1.head().getString(0) 
           } 
     //create a new JSON with a different structure 
     val new_JSON = """""" 

     compact(render(new_JSON)) 
    } 

Nebenbei bemerkt, dieser Ansatz hat ein Problem, wenn auf dem Cluster ausgeführt wird. Beim Zugriff auf den übertragenen Datenrahmen wird eine Nullzeiger-Ausnahme ausgelöst. Ich habe einen anderen Thread erstellt. Spark Streaming - null pointer exception while accessing broadcast variable

+0

Dies liegt wahrscheinlich daran, dass die Variablenauswertung vor der Verfügbarkeit der Broadcastvariablen erfolgt. Vielleicht möchten Sie Ihre Variablen als faul markieren, damit sie bis zur ersten Verwendung warten. Eine weitere Option besteht darin, Broadcast-Variablen ganz zu vermeiden, indem Sie direkt auf das zugreifen, was Sie vom Executor benötigen. Objekt GetMetaData {@Transient Lazy val MetaData = getData; def getData = ...} Der Schlüssel besteht darin, das Objekt als transient zu markieren, damit der spark serializer es ignoriert und es in ein Objekt einfügt. – sgireddy