2017-08-11 4 views
1

Ich bin ziemlich neu in Kafka und Kafka Streams also bitte mit mir ertragen. Ich würde gerne wissen, ob ich hier auf dem richtigen Weg bin.Kafka Streams App separat liest aus schreibt

Ich schreibe gerade zu einem Kafka-Thema und versuche über einen Rest-Service auf die Daten zuzugreifen. Die Rohdatenart muss transformiert werden, bevor auf sie zugegriffen wird.

Was ich bisher habe, ist ein Produzent, der die Rohdaten in ein Thema schreibt.

1.) Jetzt möchte ich Streams App (sollte ein Glas in einem Container ausgeführt werden), die nur die Daten in meiner gewünschten Form transformiert. Dem materialisierten Ansichtsparadigma folgend.

über vereinfachte Version von 1.)

KStreamBuilder builder = new KStreamBuilder(); 

    KStream<String, String> source = 
    builder.stream("my-raw-data-topic"); 

    KafkaStreams streams = new KafkaStreams(builder,props); 
    KTable<String, Long> t = source.groupByKey().count("My-Table"); 
    streams.start(); 

2.) und einer anderen Strom App (sollte in einem Behälter ein Gefäß laufen), die die KTable als eine Art Repository justs hält, die sein kann Zugriff über einen Wickel-Rest-Service.

Hier bin ich irgendwie fest mit der richtigen Art und Weise mit der API zu arbeiten. Was ist die bloße Minimun auf einen KTable zugreifen und abfragen? Muss ich die Transformationstopologie erneut dem Builder zuweisen?

KStreamBuilder builder = new KStreamBuilder(); 
KTable table = builder.table("My-Table"); //Casting? 
KafkaStreams streams = new KafkaStreams(builder, props); 

RestService service = new RestService(table); 
// Use the Table as Repository which is wrapped by a Rest-Service and gets updated reactivly 

Gerade jetzt ist diese Pseudo-Code

Bin ich auf dem richtigen Weg hier? Ist es sinnvoll zu trennen 1.) und 2.)? Ist dies die eingeprägte Methode, mit Streams zu arbeiten, um Ansichten zu materialisieren? Für mich hätte es den Vorteil, die Schreibvorgänge und die Lesevorgänge über Container unabhängig zu skalieren, wo ich mehr Traffic sehe.

Wie die repopulating der K-Tabelle ist behandelt auf einem Absturz entweder 1.) oder 2.). Ist dies über die Replikation auf die Streaming-API geschehen, oder müsste ich etwas per Code ansprechen? Wie den Cursor zurücksetzen und die Ereignisse beantworten?

Antwort

0

Paar Kommentare:

In Ihrem Code-Schnipsel (1) Sie Ihre Topologie ändern, nachdem Sie den Generator in die KafkaStreams Konstruktor übergeben:

KafkaStreams streams = new KafkaStreams(builder,props); 
// don't modify builder anymore! 

Sie sollten dies nicht tun, aber zuerst Sie Topologie angeben und erstellen Sie anschließend die KafkaStreams Instanz.

Über teilen Sie Ihre Anwendung in zwei. Dies kann sinnvoll sein, um beide Teile unabhängig voneinander zu skalieren. Aber es ist im Allgemeinen schwer zu sagen. Wenn Sie jedoch beide spucken, muss der erste das transformierte Datum in ein Ausgabethema schreiben und das zweite sollte dieses Ausgabethema als Tabelle lesen (builder.table("output-topic-of-transformation"), um die REST-Anforderungen zu bedienen.

Für den Laden der K-Tabelle zugreifen, müssen Sie über die mitgelieferten Speichernamen eine Abfrage Griff bekommen:

ReadOnlyKeyValueStore keyValueStore = 
streams.store("My-Table", QueryableStoreTypes.keyValueStore()); 

Lesen Sie die Dokumentation für weitere Details:

http://docs.confluent.io/current/streams/developer-guide.html#interactive-queries

+0

Thx für die Antworten. Gibt es einen Unterschied zwischen StateStore und KTable? Weil ich gerade an einen Tisch schreibe und der "Rest Service" das KTable nicht über dein 3. Snippet findet. Ich greife mit streams.store darauf zu ("Table-Name", ..) Muss es ein StateStore sein? – silverfighter

+0

Für diesen Fall ist es das Gleiche. Wenn Sie ein KTable erhalten, nennen Sie das KTable nicht, aber es ist ein interner Speicher. (Beachten Sie, dass der Parametername bei der Zählmethode "storeName" lautet). Also nicht sicher, warum der Laden nicht gefunden wird. Könnte ein Fehler in Ihrer RestService-Implementierung sein. –

+0

thx, könnte der Fall sein Ich habe "org.apache.kafka.streams.errors.InvalidStateStoreException: der State Store, countByApi, kann in eine andere Instanz migriert haben." In einer Einzelinstanz env on docker - und die Tabelle ist als Thema über die Befehlszeile sichtbar. – silverfighter

Verwandte Themen