Ich bin ziemlich neu in Kafka und Kafka Streams also bitte mit mir ertragen. Ich würde gerne wissen, ob ich hier auf dem richtigen Weg bin.Kafka Streams App separat liest aus schreibt
Ich schreibe gerade zu einem Kafka-Thema und versuche über einen Rest-Service auf die Daten zuzugreifen. Die Rohdatenart muss transformiert werden, bevor auf sie zugegriffen wird.
Was ich bisher habe, ist ein Produzent, der die Rohdaten in ein Thema schreibt.
1.) Jetzt möchte ich Streams App (sollte ein Glas in einem Container ausgeführt werden), die nur die Daten in meiner gewünschten Form transformiert. Dem materialisierten Ansichtsparadigma folgend.
über vereinfachte Version von 1.)
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source =
builder.stream("my-raw-data-topic");
KafkaStreams streams = new KafkaStreams(builder,props);
KTable<String, Long> t = source.groupByKey().count("My-Table");
streams.start();
2.) und einer anderen Strom App (sollte in einem Behälter ein Gefäß laufen), die die KTable
als eine Art Repository justs hält, die sein kann Zugriff über einen Wickel-Rest-Service.
Hier bin ich irgendwie fest mit der richtigen Art und Weise mit der API zu arbeiten. Was ist die bloße Minimun auf einen KTable
zugreifen und abfragen? Muss ich die Transformationstopologie erneut dem Builder zuweisen?
KStreamBuilder builder = new KStreamBuilder();
KTable table = builder.table("My-Table"); //Casting?
KafkaStreams streams = new KafkaStreams(builder, props);
RestService service = new RestService(table);
// Use the Table as Repository which is wrapped by a Rest-Service and gets updated reactivly
Gerade jetzt ist diese Pseudo-Code
Bin ich auf dem richtigen Weg hier? Ist es sinnvoll zu trennen 1.) und 2.)? Ist dies die eingeprägte Methode, mit Streams zu arbeiten, um Ansichten zu materialisieren? Für mich hätte es den Vorteil, die Schreibvorgänge und die Lesevorgänge über Container unabhängig zu skalieren, wo ich mehr Traffic sehe.
Wie die repopulating der K-Tabelle ist behandelt auf einem Absturz entweder 1.) oder 2.). Ist dies über die Replikation auf die Streaming-API geschehen, oder müsste ich etwas per Code ansprechen? Wie den Cursor zurücksetzen und die Ereignisse beantworten?
Thx für die Antworten. Gibt es einen Unterschied zwischen StateStore und KTable? Weil ich gerade an einen Tisch schreibe und der "Rest Service" das KTable nicht über dein 3. Snippet findet. Ich greife mit streams.store darauf zu ("Table-Name", ..) Muss es ein StateStore sein? – silverfighter
Für diesen Fall ist es das Gleiche. Wenn Sie ein KTable erhalten, nennen Sie das KTable nicht, aber es ist ein interner Speicher. (Beachten Sie, dass der Parametername bei der Zählmethode "storeName" lautet). Also nicht sicher, warum der Laden nicht gefunden wird. Könnte ein Fehler in Ihrer RestService-Implementierung sein. –
thx, könnte der Fall sein Ich habe "org.apache.kafka.streams.errors.InvalidStateStoreException: der State Store, countByApi, kann in eine andere Instanz migriert haben." In einer Einzelinstanz env on docker - und die Tabelle ist als Thema über die Befehlszeile sichtbar. – silverfighter