Zuvor habe ich 0,8 API verwendet. Wenn Sie die Themenliste an sie übergeben, wird eine Karte mit Streams zurückgegeben (ein Eintrag pro Thema). Dadurch kann ich einen separaten Thread erstellen und jedem Zweig den entsprechenden Stream zuweisen. Da zu jedem Thema zu viele Daten vorhanden sind, hilft ein separater Thread beim Multitasking.kafka new api 0.10 bietet keine Liste von Stream- und Consumer-Objekten pro Thema
//0.8 code sample
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
Ich möchte auf 0,10 aktualisieren. Ich überprüfte KafkaStreams
und KafkaConsumer
Klassen. KafkaConsumer
Das Objekt übernimmt die Konfigurationseigenschaften und stellt die subscribe-Methode bereit, die die Themenliste akzeptiert. Der Rückgabetyp ist ungültig. Ich kann keinen Weg finden, zu jedem Thema einen Griff zu bekommen.
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(topicsList);
conusmer.poll(long ms)
KafkaStreams
auf der anderen Seite scheint das gleiche Problem zu haben.
KStreamBuilder builder = new KStreamBuilder();
String [] topics = new String[] {"topic1", "topic2"};
KStream<byte[], byte[]> source = builder.stream(stringSerde, stringSerde, topics);
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
Es gibt source.foreach()
Methode zur Verfügung, aber es ist ein Strom von allen Themen. Irgendjemand, irgendwelche Ideen?
Beantworten Sie Ihre Frage, ja, ich werde mehrere Verbraucher erstellen, jeder für jedes Thema. Es ist nur so, dass es sich nicht richtig anfühlt, jedes Mal eine Verbindung für ein neues Thema zu erstellen, im Vergleich zum Aufruf von createMessageStreams (topicMapCount) von alter API, die eine Karte von Streams zurückgibt. Wie auch immer, danke für eine ausführliche Antwort. – colossal