2017-02-15 8 views
1

Ich habe den Code erfolgreich integriert, um Nachrichten vom Event-Hub abzurufen und sie über Spark/Spark-Streaming zu verarbeiten. Ich gehe jetzt in den Verwaltungszustand über, während die Nachrichten weitergeleitet werden. Dies ist der Code, den ich benutze, der zum größten Teil eine Adaption von https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.htmlSpark-Streaming- und Azure-Event-Hubs mapWithState

Im Wesentlichen funktioniert dies mit einer Dummy-Quelle, es funktioniert mit einem einzigen Stream auf einer einzigen Partition, aber es funktioniert nicht für die gewerkschaftlich organisiert window stream .. Während ich mehrere Instanzen des Streams für jede Partition erstellen konnte, wird der Punkt der Verbindung und des Fensters irgendwie zerstört. + meine Versuche, es so zu machen, scheiterten. Ich bin irgendwie für Inspiration stecken, wo jetzt gehen .. wenn jemand eine Idee hat, die großartig sein würde ..

val sparkSession = SparkSession.builder().master("local[2]").config(sparkConfiguration).getOrCreate() 

val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(10)) 
streamingContext.checkpoint(inputOptions.checkpointDir) 

//derive the stream and window 
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters) 
val eventHubsWindowedStream = eventHubsStream.window(Seconds(10)) 

val initialRDD = sparkSession.sparkContext.parallelize(List(("dummy", 100L), ("source", 32L))) 
val stateSpec = StateSpec.function(trackStateFunc _) 
    .initialState(initialRDD) 
    .numPartitions(2) 
    .timeout(Seconds(60)) 

val eventStream = eventHubsWindowedStream 
    .map(messageStr => { 
    //parse teh event 
    var event = gson.fromJson(new String(messageStr), classOf[Event]) 

    //return a tuble of key/value pair 
    (event.product_id.toString, 1) 
    }) 

val eventStateStream = eventStream.mapWithState(stateSpec) 

val stateSnapshotStream = eventStateStream.stateSnapshots() 
stateSnapshotStream.print() 

stateSnapshotStream.foreachRDD { rdd => 
    import sparkSession.implicits._ 
    rdd.toDF("word", "count").registerTempTable("batch_word_count") 
} 

streamingContext.remember(Minutes(1)) 

streamingContext 
+0

* es funktioniert nicht für den gewerkschaftlich organisierten Fenster-Stream. * Was nicht funktioniert? –

+0

Entschuldigung, im Wesentlichen wird die Zustandsfunktion nie aufgerufen. Ich kann zu diesem Punkt nicht debuggen. Wenn ich den Beispielcode verwende, ist es in Ordnung .. und wenn ich einen einzelnen Stream verwende, ist es in Ordnung .. aber nicht, wenn ich den gewerkschaftlichen Stream oder das Fenster verwende. –

+0

Hast du versucht, in deiner IDE lokal zu debuggen? –

Antwort

0

ich mein Problem gelöst, dass ich am Ende des direkten Strom mit und all meinen Problemen sind weggegangen. Ich hatte dies vermieden, da das Fortschrittsverzeichnis nur HDFS oder ADL unterstützt und ich jetzt nicht mehr lokal testen kann.

EventHubsUtils.createDirectStreams (Streaming, inputOptions.namespace, inputOptions.hdfs, Karte (inputOptions.eventhub -> GetEventHubParams (inputOptions)))

Dennoch hat die Vereinigung Strom nicht funktioniert .. Jetzt ich nur muss herausfinden, wie man das Fortschrittsverzeichnis in HDFS löscht !!

Verwandte Themen