2017-08-10 2 views
6

Ich versuche, Strom mit zwei verschiedenen Fenstern zu aggregieren und es in die Konsole zu drucken. Es wird jedoch nur die erste Streaming-Abfrage gedruckt. Die tenSecsQ wird nicht in die Konsole gedruckt.Ausführen von separaten Streaming-Abfragen in Spark-Streaming-Streaming

SparkSession spark = SparkSession 
    .builder() 
    .appName("JavaStructuredNetworkWordCountWindowed") 
    .config("spark.master", "local[*]") 
    .getOrCreate(); 

Dataset<Row> lines = spark 
    .readStream() 
    .format("socket") 
    .option("host", host) 
    .option("port", port) 
    .option("includeTimestamp", true) 
    .load(); 

Dataset<Row> words = lines 
    .as(Encoders.tuple(Encoders.STRING(), Encoders.TIMESTAMP())) 
    .toDF("word", "timestamp"); 

// 5 second window 
Dataset<Row> fiveSecs = words 
    .groupBy(
     functions.window(words.col("timestamp"), "5 seconds"), 
     words.col("word") 
    ).count().orderBy("window"); 

// 10 second window 
Dataset<Row> tenSecs = words 
    .groupBy(
      functions.window(words.col("timestamp"), "10 seconds"), 
      words.col("word") 
    ).count().orderBy("window"); 

Trigger-Streaming-Abfrage für 5s und 10s aggregierte Streams. Die Ausgabe für 10s Stream wird nicht gedruckt. Nur 5s wird in Konsole gedruckt

// Start writeStream() for 5s window 
StreamingQuery fiveSecQ = fiveSecs.writeStream() 
    .queryName("5_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

// Start writeStream() for 10s window 
StreamingQuery tenSecsQ = tenSecs.writeStream() 
    .queryName("10_secs") 
    .outputMode("complete") 
    .format("console") 
    .option("truncate", "false") 
    .start(); 

tenSecsQ.awaitTermination(); 
+0

Eigentlich weiß ich nicht, wie der Socket-Stream funktioniert, aber für mich scheint Ihr erster Spark-Stream alle Daten aus dem Socket-Stream gelesen und nichts bleibt für die zweite. –

Antwort

5

Ich habe diese Frage untersucht.

Zusammenfassung: Jede Abfrage in Structured Streaming verbraucht die Daten source. Die Socket-Quelle erstellt für jede definierte Abfrage eine neue Verbindung. Das in diesem Fall angezeigte Verhalten ist, weil nur die Eingabedaten an die erste Verbindung liefert.

Fortan ist es nicht möglich, mehrere Aggregationen über die Socket-Verbindung zu definieren, es sei denn, wir können sicherstellen, dass die verbundene Socket-Quelle die gleichen Daten für jede geöffnete Verbindung liefert.


Ich habe diese Frage auf der Spark-Mailingliste diskutiert. Databricks Entwickler Shixiong Zhu antwortete:

Spark erstellt eine Verbindung für jede Abfrage. Das Verhalten, das Sie beobachtet haben, ist, weil wie "nc -lk" funktioniert. Wenn Sie netstat verwenden, um die TCP-Verbindungen zu überprüfen, werden zwei Verbindungen beim Starten von zwei Abfragen angezeigt. "Nc" leitet die Eingabe jedoch nur an eine Verbindung weiter.

prüft ich dieses Verhalten durch ein kleines Experiment zu definieren: Zuerst habe ich eine SimpleTCPWordServer erstellt, die jede Verbindung offen und eine grundlegende Structured Streaming Job zufällige Wörter liefert, die zwei Abfragen erklärt. Der einzige Unterschied zwischen ihnen ist, dass die zweite Abfrage eine zusätzliche Konstante Spalte definiert seinen Ausgang zu unterscheiden:

val lines = spark 
    .readStream 
    .format("socket") 
    .option("host", "localhost") 
    .option("port", "9999") 
    .option("includeTimestamp", true) 
    .load() 

val q1 = lines.writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("5 seconds")) 
    .start() 

val q2 = lines.withColumn("foo", lit("foo")).writeStream 
    .outputMode("append") 
    .format("console") 
    .trigger(Trigger.ProcessingTime("7 seconds")) 
    .start() 

Wenn StructuredStreaming nur ein Strom verbrauchen würde, dann sollten wir die gleichen Worte von beiden Abfragen geliefert sehen. In dem Fall, dass jede Abfrage einen separaten Stream verbraucht, werden von jeder Abfrage verschiedene Wörter gemeldet.

Dies ist die beobachtete Ausgabe:

------------------------------------------- 
Batch: 0 
------------------------------------------- 
+--------+-------------------+ 
| value|   timestamp| 
+--------+-------------------+ 
|champion|2017-08-14 13:54:51| 
+--------+-------------------+ 

+------+-------------------+---+ 
| value|   timestamp|foo| 
+------+-------------------+---+ 
|belong|2017-08-14 13:54:51|foo| 
+------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+-------+-------------------+---+ 
| value|   timestamp|foo| 
+-------+-------------------+---+ 
| agenda|2017-08-14 13:54:52|foo| 
|ceiling|2017-08-14 13:54:52|foo| 
| bear|2017-08-14 13:54:53|foo| 
+-------+-------------------+---+ 

------------------------------------------- 
Batch: 1 
------------------------------------------- 
+----------+-------------------+ 
|  value|   timestamp| 
+----------+-------------------+ 
| breath|2017-08-14 13:54:52| 
|anticipate|2017-08-14 13:54:52| 
| amazing|2017-08-14 13:54:52| 
| bottle|2017-08-14 13:54:53| 
| calculate|2017-08-14 13:54:53| 
|  asset|2017-08-14 13:54:54| 
|  cell|2017-08-14 13:54:54| 
+----------+-------------------+ 

Wir klar sehen können, dass die Ströme für jede Abfrage unterschiedlich sind. Es sieht so aus, als ob es nicht möglich ist, mehrere Aggregationen über die von socket source gelieferten Daten zu definieren, es sei denn, wir können garantieren, dass der TCP-Backend-Server genau die gleichen Daten für jede offene Verbindung liefert.

Verwandte Themen