Wir entwickeln eine Spark-Streaming-ETL-Anwendung, die die Daten von Kafka bezieht, notwendige Transformationen anwendet und die Daten in MongoDB lädt. Die von Kafka erhaltenen Daten sind im JSON-Format. Die Transformationen werden auf jedes Element (JSON-String) der RDD basierend auf den von MongoDB abgerufenen Suchdaten angewendet. Da sich die Suchdaten ändern, muss ich sie für jedes Batch-Intervall abrufen. Die Suchdaten werden mit SqlContext.read von MongoDB gelesen. Ich konnte SqlContext.read nicht in DStream.transform verwenden, da der SqlContext nicht serialisierbar ist, sodass ich ihn nicht an die Worker-Knoten senden kann. Jetzt versuche ich mit DStream.foreREDD, in dem ich die Daten von MongoDB hole und die Nachschlagedaten an die Arbeiter ausstrahle. Alle Transformationen auf den RDD-Elementen werden innerhalb der Schließung von rdd.map ausgeführt, die die übertragenen Daten verwendet und Transformationen durchführt und eine RDD zurückgibt. Die RDD wird dann in einen Datenrahmen konvertiert und in MongoDB geschrieben. Derzeit läuft diese Anwendung sehr langsam.DStream RDD mit externen Daten transformieren
PS: Wenn ich den Teil des Codes verschiebe, der die Suchdaten aus DStream.foreachRDD holt und DStream.transform hinzufügt, um Transformationen anzuwenden, und DStream.foreRADD nur die Daten in MongoDB einfügen, ist die Leistung sehr gut. Bei diesem Ansatz werden die Suchdaten jedoch nicht für jedes Stapelintervall aktualisiert.
Ich suche Hilfe, um zu verstehen, ob dies ein guter Ansatz ist, und ich suche nach einer Anleitung zur Verbesserung der Leistung.
Es folgt eine Pseudocode
package com.testing
object Pseudo_Code {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Pseudo_Code")
.setMaster("local[4]")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
val sqlContext = new SQLContext(sc)
val ssc = new StreamingContext(sc, Seconds(1))
val mongoIP = "127.0.0.1:27017"
val DBConnectionURI = "mongodb://" + mongoIP + "/" + "DBName"
val bootstrap_server_config = "127.0.0.100:9092"
val zkQuorum = "127.0.0.101:2181"
val group = "streaming"
val TopicMap = Map("sampleTopic" -> 1)
val KafkaDStream = KafkaUtils.createStream(ssc, zkQuorum, group, TopicMap).map(_._2)
KafkaDStream.foreachRDD{rdd =>
if (rdd.count() > 0) {
//This lookup data has information required to perform transformation
//This information keeps changing, so the data should be fetched for every batch interval
val lookup1 = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.input.uri", DBConnectionURI)
.option("spark.mongodb.input.collection", "lookupCollection1")
.load()
val broadcastLkp1 = sc.broadcast(lookup1)
val new_rdd = rdd.map{elem =>
val json: JValue = parse(elem)
//Foreach element in rdd, there are some values that should be looked up from the broadcasted lookup data
//"value" extracted from json
val param1 = broadcastLkp1.value.filter(broadcastLkp1.value("key")==="value").select("param1").distinct()
val param1ReplaceNull = if(param1.count() == 0){
"constant"
}
else{
param1.head().getString(0)
}
//create a new JSON with a different structure
val new_JSON = """"""
compact(render(new_JSON))
}
val targetSchema = new StructType(Array(StructField("key1",StringType,true)
,StructField("key2",TimestampType,true)))
val transformedDf = sqlContext.read.schema(targetSchema).json(new_rdd)
transformedDf.write
.option("spark.mongodb.output.uri",DBConnectionURI)
.option("collection", "tagetCollectionName")
.mode("append").format("com.mongodb.spark.sql").save()
}
}
// Run the streaming job
ssc.start()
ssc.awaitTermination()
}
}
Sie haben hier ein interessantes Problem. Dinge, die du beachten solltest: Kannst du sowohl von deinem Kafka als auch von deiner MongoDB streamen? Wenn dies der Fall ist, könnten Sie beide DStreams gleichzeitig bearbeiten. –
@MichelLemay Haben Sie ein Beispiel, wie Sie von mongoDB streamen können. Ich kann es versuchen. Im Moment konnte ich mich ein wenig vorwärts bewegen, indem ich einige Anweisungen in https://stackoverflow.com/questions/37638519/spark-streaming-how-to-periodically-refresh-cached-rdd befolgte. Ich habe eine DStream.foreachRDD erstellt, in der ich die Nachschlagedaten neu lade, dann DStream.transform, wo die Nachschlagedaten verwendet werden und eine neue RDD zurückgibt, dann eine weitere foreachRDD, um Daten in mongoDB einzufügen. Das funktioniert, aber die Leistung ist sehr schlecht. – Sid
haben Sie versucht, die Funktion "from_json" in der Datenframe-API für Ihre json-Transformationen verfügbar zu machen? Sie könnten strukturiertes Streaming versuchen (wenn Ihr Treiber .writeStream unterstützt). val msgSchema = Encoders.product [Nachricht] .schema val ds = df .select (von_json ($ "value" .cast ("string"), msgSchema) .as [Nachricht]) – sgireddy