Schreiben zu kafka Thema von innerhalb einer Kartenfunktion (SCALA)?zu kafka Thema innerhalb einer Kartenmethode veröffentlichen
- von einem kafka Thema in einer Anwendung FLINK Lesen
- die Daten innerhalb einer Map-Funktion bearbeitet
- Problemstellung - Innerhalb der Karte Funktion, die ich durch eine Liste bin Looping. Für jedes Element in der Liste möchte ich ein Kafka-Thema veröffentlichen.
- wenn ich die Ausgabe von der Karte und Spüle es funktioniert, aber wenn ich eine Push-to-Thema aus den Karten Methode versuchen tut es nicht
Ist es möglich, aus einer Karte Methode Thema zu veröffentlichen
// Main Function def main(args: Array[String]) { ... // some list val list_ = ("a", "b", "c", "d") // Setup Properties val props = new Properties() props.setProperty("zookeeper.connect", zookeeper_url + ":" + zookeeper_port) props.setProperty("bootstrap.servers", broker_url + ":" + broker_port) props.setProperty("auto.offset.reset", "earliest") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") ... // Connect to Source val input_stream = env.addSource(new FlinkKafkaConsumer09[String](topic_in, new SimpleStringSchema(), properties)) // Process each Record val stream = input_stream.map(x=> { // loop through list "list_" -> variable in in Main // and publish to topic_out // -- THIS IS MY CURRENT ISSUE !!!) // -- Does not work (No compile issue) // var producer2 = new KafkaProducer[String, String](props) var record = new ProducerRecord(topic_out, "KEY", list(i)) producer2.send(record) producer2.flush() // ... Other process and return processed string }) // publish to different topic of proccessed input string (Works) stream.addSink(new FlinkKafkaProducer09[String](broker_url + ":" + broker_port, other_topic, new SimpleStringSchema()))
Was "funktioniert nicht"? Warum erstellst du den Produzenten in der Schleife? Was ist der Deal mit all den "Vars"? Was ist 'ich' ???? Warum benutzen Sie verschiedene Erzeugerklassen innerhalb und außerhalb der 'Karte'? – Dima
"funktioniert nicht" - Wenn ich den Job einreiche, flink. es läuft der Job aber es scheint, es geht nicht in die Kartenfunktion (wenn der Hersteller-Code enthalten ist) " –
Ich versuche, eine Liste basierend auf einer Bedingung aus dem Eingabestream zu durchlaufen und zu einem Thema der Artikel zu veröffentlichen Interesse –