2017-09-11 4 views
0

Ist es in Apache Flink möglich, einen neuen Datenstrom während der Laufzeit dynamisch hinzuzufügen, ohne den Job neu zu starten?Apache Flink neuen Stream dynamisch hinzufügen

Soweit ich verstand, ein übliches Flink Programm sieht wie folgt aus:

val env = StreamExecutionEnvironment.getExecutionEnvironment() 
val text = env.socketTextStream(hostname, port, "\n") 
val windowCounts = text.map... 

env.execute("Socket Window WordCount") 

In meinem Fall ist es möglich, dass zum Beispiel Ein neues Gerät wird gestartet und daher muss ein weiterer Stream verarbeitet werden. Aber wie fügt man diesen neuen Stream on-the-fly hinzu?

Antwort

1

Es ist nicht möglich, neue Streams zur Laufzeit einem Flink-Programm hinzuzufügen.

Die Lösung dieses Problems besteht darin, einen Stream zu haben, der alle eingehenden Ereignisse enthält (z. B. ein Kafka-Thema, in das Sie alle einzelnen Streams aufnehmen). Die Ereignisse sollten einen Schlüssel haben, aus welchem ​​Strom sie kommen. Dieser Schlüssel kann dann dazu verwendet werden, den Stream keyBy und eine per Key Verarbeitungslogik anzuwenden.

Wenn Sie von mehreren Sockeln lesen möchten, könnten Sie Ihre eigene SourceFunction schreiben, die von einem Eingang liest (z. B. von einem festen Sockel) die Ports, um einen Sockel für zu öffnen. Dann könntest du intern alle diese Sockel offen halten und von ihnen round robin lesen.