2017-12-07 7 views
0

Schreiben zu kafka Thema von innerhalb einer Kartenfunktion (SCALA)?zu kafka Thema innerhalb einer Kartenmethode veröffentlichen

  1. von einem kafka Thema in einer Anwendung FLINK Lesen
  2. die Daten innerhalb einer Map-Funktion bearbeitet
  3. Problemstellung - Innerhalb der Karte Funktion, die ich durch eine Liste bin Looping. Für jedes Element in der Liste möchte ich ein Kafka-Thema veröffentlichen.
  4. wenn ich die Ausgabe von der Karte und Spüle es funktioniert, aber wenn ich eine Push-to-Thema aus den Karten Methode versuchen tut es nicht
  5. Ist es möglich, aus einer Karte Methode Thema zu veröffentlichen

    // Main Function 
    def main(args: Array[String]) { 
    
    ... 
    // some list 
    val list_ = ("a", "b", "c", "d") 
    // Setup Properties 
    val props = new Properties() 
    props.setProperty("zookeeper.connect", zookeeper_url + ":" + zookeeper_port) 
    props.setProperty("bootstrap.servers", broker_url + ":" + broker_port) 
    props.setProperty("auto.offset.reset", "earliest") 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") 
    
    ... 
    
    // Connect to Source 
    val input_stream = env.addSource(new FlinkKafkaConsumer09[String](topic_in, new SimpleStringSchema(), properties)) 
    
    // Process each Record 
    val stream = input_stream.map(x=> {   
    
        // loop through list "list_" -> variable in in Main 
        // and publish to topic_out 
        // -- THIS IS MY CURRENT ISSUE !!!) 
        // -- Does not work (No compile issue) 
        // 
        var producer2 = new KafkaProducer[String, String](props) 
        var record = new ProducerRecord(topic_out, "KEY", list(i)) 
        producer2.send(record) 
        producer2.flush() 
    
    // ... Other process and return processed string 
    
    }) 
    
    // publish to different topic of proccessed input string (Works) 
    stream.addSink(new FlinkKafkaProducer09[String](broker_url + ":" + broker_port, other_topic, new SimpleStringSchema())) 
    
+0

Was "funktioniert nicht"? Warum erstellst du den Produzenten in der Schleife? Was ist der Deal mit all den "Vars"? Was ist 'ich' ???? Warum benutzen Sie verschiedene Erzeugerklassen innerhalb und außerhalb der 'Karte'? – Dima

+0

"funktioniert nicht" - Wenn ich den Job einreiche, flink. es läuft der Job aber es scheint, es geht nicht in die Kartenfunktion (wenn der Hersteller-Code enthalten ist) " –

+0

Ich versuche, eine Liste basierend auf einer Bedingung aus dem Eingabestream zu durchlaufen und zu einem Thema der Artikel zu veröffentlichen Interesse –

Antwort

3

Sie keinen kafka Produzenten innerhalb einer Kartenfunktion machen und versuchen Sie nicht innerhalb von einer Karte auf einen kafka Thema zu schreiben. Ehrlich gesagt, ich kann nichts sagen, es ist eine schlechte Idee ... aber es ist eine schlechte Idee.

Stattdessen. Ändern Sie Ihre Kartenfunktion in eine FlatMap (siehe das erste Beispiel hier: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html).

Also in Ihrer Schleife, anstatt einen Kafka-Produzent jede Schleife zu machen, tun Sie einfach collector.collect(recordToPublishToKafka).

Und Ihre Spüle wird jedes herausgeben, wie sie gesammelt werden.

+0

Danke, das hilft.Ich stimme Ihnen zu (ich lerne immer noch das Flink/Kafka Werkzeugset, daher die schlechte Idee ...) wird dauern Ihr Rat Viele Dank und Grüße –

Verwandte Themen