2016-11-11 5 views
3

Ich habe vor kurzem einige Probleme mit der Entwicklung von Flink Jobs, die Spring und Hibernate eingeführt haben, und der Job würde auf Flink Cluster laufen. Also muss ich die Spring-Ressource initialisieren, bevor ich flink-Operatoren anstelle des Job-Managers auf den Task-Manager setze. Aber ich kann dafür keine geeignete Methode von StreamExecutionEnvironment finden.Wie initialisiert man die Spring Ressource von flink Job auf flink Umgebung

Ich habe dies einige Ansätze versucht, wie unten:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); 
// etl business logic as flink operators 
FlinkOperators.run(); 
env.execute(); 

Wenn jedoch der flink Job, deren Parallelität mehr als ein ausführt, wird die Feder Initialisierung NICHT in jedem Task-Manager-Prozess sein. Also kann ich den Frühling im Flink-Job nicht benutzen.

Gibt es einen Ansatz zur Initialisierung von Federressourcen auf Flink-Job?

Danke.

Mit freundlichen Grüßen.

Alvin

+0

Ich hatte das Problem gerade, haben Sie die Lösung gefunden? – zt1983811

Antwort

0

Jedesmal, benötigen Sie irgendeine Art von Kontext-Initialisierung für jeden Task-Manager haben, eine statische Funktion (eine Funktion innerhalb eines Objekts, wenn Sie scala verwenden) zu bauen, dass diejenigen initialisierten Werte innerhalb einer statischen Variablen speichert.

Das sollte reichen, da statische Werte im Speicher jedes Task Managers gespeichert werden.

Ich verwende diese Vorgehensweise, um Eigenschaftendateien in jedem Task-Manager zu laden. Diese Eigenschaftendateien enthalten pro Jobkonfiguration. Wenn Sie Dateien laden, prüfen Sie, ob jeder Taskmanager eine Kopie der Datei hat, die Sie laden möchten.