Ich habe einen Job, der Daten von Cassandra liest und die Daten als Liste speichert (die Methode fillOnceGeoFencesFromDB() unten angehängt) und dann StreamExecutionEnvironment erzeuge und Daten aus der Kafka-Warteschlange konsumiere.Übergeben von Parametern zwischen Flink-Jobs
Während der Umwandlung von DataStream versuche ich, kürzlich gefüllte statische ArrayList zu referenzieren, aber es ist leer.
Was ist eine Best Practice, um bereits ausgefüllte Listen in den nächsten Job zu übernehmen? Jede Idee wird geschätzt.
private static ArrayList<GeoFences> allGeoFences = new ArrayList<>();
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", KAFKA_GROUP);
kafkaProps.setProperty("auto.offset.reset", "earliest");
fillOnceGeoFencesFromDB(); // populate data in ArrayList<GeoFences> allGeoFences
DataStream <Tuple6<UUID, String, String, String, String, Timestamp>> stream_parsed_with_timestamps = env
.addSource(new FlinkKafkaConsumer010<>(KAFKA_SUBSCRIBE_TOPIC, new SimpleStringSchema(), kafkaProps))
.rebalance().map(new MapFunction<String, Tuple4<UUID, String, String, Timestamp>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple4<UUID, String, String, Timestamp> map(String value) throws Exception {
return mapToTuple4(value);
}})
. . . . . .
Vielen Dank für die Erklärung. Sie haben Recht, es funktioniert – Mgreg
Gut zu hören. Fühlen Sie sich frei, die Antwort zu akzeptieren :) – TobiSH
zu niedrigen Ruf Mann ... Ich werde :) – Mgreg