2

Zuvor habe ich 0,8 API verwendet. Wenn Sie die Themenliste an sie übergeben, wird eine Karte mit Streams zurückgegeben (ein Eintrag pro Thema). Dadurch kann ich einen separaten Thread erstellen und jedem Zweig den entsprechenden Stream zuweisen. Da zu jedem Thema zu viele Daten vorhanden sind, hilft ein separater Thread beim Multitasking.kafka new api 0.10 bietet keine Liste von Stream- und Consumer-Objekten pro Thema

//0.8 code sample 
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = 
consumer.createMessageStreams(topicCountMap); 

Ich möchte auf 0,10 aktualisieren. Ich überprüfte KafkaStreams und KafkaConsumer Klassen. KafkaConsumer Das Objekt übernimmt die Konfigurationseigenschaften und stellt die subscribe-Methode bereit, die die Themenliste akzeptiert. Der Rückgabetyp ist ungültig. Ich kann keinen Weg finden, zu jedem Thema einen Griff zu bekommen.

KafkaConsumer consumer = new KafkaConsumer(props); 
consumer.subscribe(topicsList); 
conusmer.poll(long ms) 

KafkaStreams auf der anderen Seite scheint das gleiche Problem zu haben.

KStreamBuilder builder = new KStreamBuilder(); 
String [] topics = new String[] {"topic1", "topic2"}; 
KStream<byte[], byte[]> source = builder.stream(stringSerde, stringSerde, topics); 
KafkaStreams streams = new KafkaStreams(builder, props); 
streams.start(); 

Es gibt source.foreach() Methode zur Verfügung, aber es ist ein Strom von allen Themen. Irgendjemand, irgendwelche Ideen?

Antwort

2

zunächst eine Multi-Threaded Verbraucher ist schwierig, so dass das Muster, das Sie beschäftigt in 0.8 verwendet, wird hoffentlich gut gestaltet :)

Best Practice ist ein Single-Threaded-Verbraucher zu verwenden, und somit gibt es „keine Notwendigkeit“ zu Trennen Sie verschiedene Themen, wenn ein einzelner Verbraucher gleichzeitig eine Themenliste abonniert. Beim Datensatzverbrauch liefert das Datensatzobjekt jedoch Informationen darüber, von welchem ​​Thema es stammt (es enthält diese Metadaten). Somit könnte man einen Datensatz theoretisch nach seinen Themen zu einem anderen Thread für die eigentliche Bearbeitung versenden (auch wenn dies nicht zu empfehlen ist!).

Kafka Waagen aus über Partitionen damit, wenn ein Single-Threaded-Verbraucher nicht in der Lage ist, die Last zu bewältigen, sollten Sie mehrere Verbraucher (als Verbrauchergruppe) zu skalieren Ihre Verbraucher Verarbeitungskapazität starten.

Eine allgemeinere Frage: Wenn Sie Daten pro Thema verarbeiten möchten, warum verwenden Sie nicht mehrere Konsumenten, die jeweils ein einzelnes Thema abonnieren?

Last but not least, in Apache Kafka 0.10+ der Kafka Streams API ist eine neu eingeführte Stromverarbeitung Bibliothek - obwohl es nicht mit 0,8 KafkaStream Klasse (Hinweis, es gibt keine "s") verwechselt werden darf. Beide sind völlig unabhängig voneinander.

+0

Beantworten Sie Ihre Frage, ja, ich werde mehrere Verbraucher erstellen, jeder für jedes Thema. Es ist nur so, dass es sich nicht richtig anfühlt, jedes Mal eine Verbindung für ein neues Thema zu erstellen, im Vergleich zum Aufruf von createMessageStreams (topicMapCount) von alter API, die eine Karte von Streams zurückgibt. Wie auch immer, danke für eine ausführliche Antwort. – colossal

Verwandte Themen