Ich habe den Code erfolgreich integriert, um Nachrichten vom Event-Hub abzurufen und sie über Spark/Spark-Streaming zu verarbeiten. Ich gehe jetzt in den Verwaltungszustand über, während die Nachrichten weitergeleitet werden. Dies ist der Code, den ich benutze, der zum größten Teil eine Adaption von https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.htmlSpark-Streaming- und Azure-Event-Hubs mapWithState
Im Wesentlichen funktioniert dies mit einer Dummy-Quelle, es funktioniert mit einem einzigen Stream auf einer einzigen Partition, aber es funktioniert nicht für die gewerkschaftlich organisiert window stream .. Während ich mehrere Instanzen des Streams für jede Partition erstellen konnte, wird der Punkt der Verbindung und des Fensters irgendwie zerstört. + meine Versuche, es so zu machen, scheiterten. Ich bin irgendwie für Inspiration stecken, wo jetzt gehen .. wenn jemand eine Idee hat, die großartig sein würde ..
val sparkSession = SparkSession.builder().master("local[2]").config(sparkConfiguration).getOrCreate()
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(10))
streamingContext.checkpoint(inputOptions.checkpointDir)
//derive the stream and window
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters)
val eventHubsWindowedStream = eventHubsStream.window(Seconds(10))
val initialRDD = sparkSession.sparkContext.parallelize(List(("dummy", 100L), ("source", 32L)))
val stateSpec = StateSpec.function(trackStateFunc _)
.initialState(initialRDD)
.numPartitions(2)
.timeout(Seconds(60))
val eventStream = eventHubsWindowedStream
.map(messageStr => {
//parse teh event
var event = gson.fromJson(new String(messageStr), classOf[Event])
//return a tuble of key/value pair
(event.product_id.toString, 1)
})
val eventStateStream = eventStream.mapWithState(stateSpec)
val stateSnapshotStream = eventStateStream.stateSnapshots()
stateSnapshotStream.print()
stateSnapshotStream.foreachRDD { rdd =>
import sparkSession.implicits._
rdd.toDF("word", "count").registerTempTable("batch_word_count")
}
streamingContext.remember(Minutes(1))
streamingContext
* es funktioniert nicht für den gewerkschaftlich organisierten Fenster-Stream. * Was nicht funktioniert? –
Entschuldigung, im Wesentlichen wird die Zustandsfunktion nie aufgerufen. Ich kann zu diesem Punkt nicht debuggen. Wenn ich den Beispielcode verwende, ist es in Ordnung .. und wenn ich einen einzelnen Stream verwende, ist es in Ordnung .. aber nicht, wenn ich den gewerkschaftlichen Stream oder das Fenster verwende. –
Hast du versucht, in deiner IDE lokal zu debuggen? –