2017-07-09 4 views
1

Ich habe einen Job, der Daten von Cassandra liest und die Daten als Liste speichert (die Methode fillOnceGeoFencesFromDB() unten angehängt) und dann StreamExecutionEnvironment erzeuge und Daten aus der Kafka-Warteschlange konsumiere.Übergeben von Parametern zwischen Flink-Jobs

Während der Umwandlung von DataStream versuche ich, kürzlich gefüllte statische ArrayList zu referenzieren, aber es ist leer.

Was ist eine Best Practice, um bereits ausgefüllte Listen in den nächsten Job zu übernehmen? Jede Idee wird geschätzt.

private static ArrayList<GeoFences> allGeoFences = new ArrayList<>(); 

    public static void main(String[] args) throws Exception { 
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setParallelism(1); 
     env.enableCheckpointing(5000); // checkpoint every 5000 msecs 
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 

     Properties kafkaProps = new Properties(); 
     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST); 
     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER); 
     kafkaProps.setProperty("group.id", KAFKA_GROUP); 
     kafkaProps.setProperty("auto.offset.reset", "earliest"); 

     fillOnceGeoFencesFromDB(); // populate data in ArrayList<GeoFences> allGeoFences 

     DataStream <Tuple6<UUID, String, String, String, String, Timestamp>> stream_parsed_with_timestamps = env 
       .addSource(new FlinkKafkaConsumer010<>(KAFKA_SUBSCRIBE_TOPIC, new SimpleStringSchema(), kafkaProps)) 
       .rebalance().map(new MapFunction<String, Tuple4<UUID, String, String, Timestamp>>() { 
        private static final long serialVersionUID = 1L; 

        @Override 
        public Tuple4<UUID, String, String, Timestamp> map(String value) throws Exception { 
         return mapToTuple4(value); 
        }}) 

. . . . . .

Antwort

0

Bitte beachten Sie, dass alles, was in der Map-Funktion passiert, auf den Task-Managern stattfindet, während der gesamte Code nur zur Definition Ihres Jobs verwendet wird.

Übergeben Sie Ihren Parameter explizit an die MapFunction (Dadurch wird der Code leichter zu lesen).

private static class GeoFenceMapper implements MapFunction<String, Tuple4<UUID, String, String, Timestamp>> { 

    private ArrayList<GeoFences> allGeoFences; 

    public GeoFenceMapper(ArrayList<GeoFences> allGeoFences) { 
     this.allGeoFences = allGeoFences; 
    } 

    @Override 
    public Tuple4<UUID, String, String, Timestamp> map(String value) throws Exception { 
     return mapToTuple4(value); 
    }}) 
} 

und als diese neue Mapper verwenden:

DataStream <Tuple6<UUID, String, String, String, String, Timestamp>> stream_parsed_with_timestamps = env 
       .addSource(new FlinkKafkaConsumer010<>(KAFKA_SUBSCRIBE_TOPIC, new SimpleStringSchema(), kafkaProps)) 
       .rebalance().map(new GeoFenceMapper(fillOnceGeoFencesFromDB())) 

hoffe, das hilft!

+0

Vielen Dank für die Erklärung. Sie haben Recht, es funktioniert – Mgreg

+0

Gut zu hören. Fühlen Sie sich frei, die Antwort zu akzeptieren :) – TobiSH

+0

zu niedrigen Ruf Mann ... Ich werde :) – Mgreg

Verwandte Themen