Ich habe diese Frage untersucht.
Zusammenfassung: Jede Abfrage in Structured Streaming verbraucht die Daten source
. Die Socket-Quelle erstellt für jede definierte Abfrage eine neue Verbindung. Das in diesem Fall angezeigte Verhalten ist, weil nur die Eingabedaten an die erste Verbindung liefert.
Fortan ist es nicht möglich, mehrere Aggregationen über die Socket-Verbindung zu definieren, es sei denn, wir können sicherstellen, dass die verbundene Socket-Quelle die gleichen Daten für jede geöffnete Verbindung liefert.
Ich habe diese Frage auf der Spark-Mailingliste diskutiert. Databricks Entwickler Shixiong Zhu antwortete:
Spark erstellt eine Verbindung für jede Abfrage. Das Verhalten, das Sie beobachtet haben, ist, weil wie "nc -lk" funktioniert. Wenn Sie netstat
verwenden, um die TCP-Verbindungen zu überprüfen, werden zwei Verbindungen beim Starten von zwei Abfragen angezeigt. "Nc" leitet die Eingabe jedoch nur an eine Verbindung weiter.
prüft ich dieses Verhalten durch ein kleines Experiment zu definieren: Zuerst habe ich eine SimpleTCPWordServer
erstellt, die jede Verbindung offen und eine grundlegende Structured Streaming Job zufällige Wörter liefert, die zwei Abfragen erklärt. Der einzige Unterschied zwischen ihnen ist, dass die zweite Abfrage eine zusätzliche Konstante Spalte definiert seinen Ausgang zu unterscheiden:
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.option("includeTimestamp", true)
.load()
val q1 = lines.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
val q2 = lines.withColumn("foo", lit("foo")).writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime("7 seconds"))
.start()
Wenn StructuredStreaming nur ein Strom verbrauchen würde, dann sollten wir die gleichen Worte von beiden Abfragen geliefert sehen. In dem Fall, dass jede Abfrage einen separaten Stream verbraucht, werden von jeder Abfrage verschiedene Wörter gemeldet.
Dies ist die beobachtete Ausgabe:
-------------------------------------------
Batch: 0
-------------------------------------------
+--------+-------------------+
| value| timestamp|
+--------+-------------------+
|champion|2017-08-14 13:54:51|
+--------+-------------------+
+------+-------------------+---+
| value| timestamp|foo|
+------+-------------------+---+
|belong|2017-08-14 13:54:51|foo|
+------+-------------------+---+
-------------------------------------------
Batch: 1
-------------------------------------------
+-------+-------------------+---+
| value| timestamp|foo|
+-------+-------------------+---+
| agenda|2017-08-14 13:54:52|foo|
|ceiling|2017-08-14 13:54:52|foo|
| bear|2017-08-14 13:54:53|foo|
+-------+-------------------+---+
-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------------------+
| value| timestamp|
+----------+-------------------+
| breath|2017-08-14 13:54:52|
|anticipate|2017-08-14 13:54:52|
| amazing|2017-08-14 13:54:52|
| bottle|2017-08-14 13:54:53|
| calculate|2017-08-14 13:54:53|
| asset|2017-08-14 13:54:54|
| cell|2017-08-14 13:54:54|
+----------+-------------------+
Wir klar sehen können, dass die Ströme für jede Abfrage unterschiedlich sind. Es sieht so aus, als ob es nicht möglich ist, mehrere Aggregationen über die von socket source
gelieferten Daten zu definieren, es sei denn, wir können garantieren, dass der TCP-Backend-Server genau die gleichen Daten für jede offene Verbindung liefert.
Eigentlich weiß ich nicht, wie der Socket-Stream funktioniert, aber für mich scheint Ihr erster Spark-Stream alle Daten aus dem Socket-Stream gelesen und nichts bleibt für die zweite. –