2016-12-14 3 views
1

Ich bin der Ignite und Kafka Integration versucht kafka Nachricht in Ignite-Cache zu bringen.Ignite und Kafka Integration

Meine Nachrichten-Taste ist eine zufällige Zeichenfolge (mit Ignite umgehen, die kafkas Nachrichtenschlüssel kann nicht null sein), und der Wert ist eine JSON-String-Darstellung für je Person (eine Java-Klasse)

Wenn Ignite empfängt So eine Nachricht, es sieht so aus, dass Ignite den Schlüssel der Nachricht (die zufällige Zeichenfolge in meinem Fall) als Cache-Schlüssel verwendet.

Ist es möglich, den Nachrichtenschlüssel zu der Person ID zu ändern, so dass ich die in den Cache setzen.

Sieht aus, dass streamer.receiver(new StreamReceiver) praktikabel ist

 streamer.receiver(new StreamReceiver<String, String>() { 
      public void receive(IgniteCache<String, String> cache, Collection<Map.Entry<String, String>> entries) throws IgniteException { 
       for (Map.Entry<String, String> entry : entries) { 
        Person p = fromJson(entry.getValue()); 
        //ignore the message key,and use person id as the cache key 
        cache.put(p.getId(), p); 
       } 
      } 
     }); 

Ist dies der empfohlene Weg? und ich bin mir nicht sicher, ob das Aufrufen von cache.put in StreamReceiver ein korrekter Weg ist, da es nur ein Vorverarbeitungsschritt vor dem Schreiben in den Cache ist.

Antwort

2

Datenstreamer alle Ihre Schlüssel Karte Affinität Knoten zwischenzuspeichern, erstellen Chargen von Einträgen und senden Chargen Affinität Knoten. Nachdem es StreamReceiver Ihre Eingaben erhalten, bekommen Person ‚s ID und rufen cache.put(K, V). Die Eingabe führt dazu, dass Sie Ihren Schlüssel dem entsprechenden Cache-Affinitätsknoten zuordnen und eine Aktualisierungsanforderung an diesen Knoten senden.

Alles sieht gut aus. Aber das Ergebnis der Zuordnung Ihres zufälligen Schlüssels von Kafka und das Ergebnis der Zuordnung Person ID wird anders sein (am wahrscheinlichsten verschiedene Knoten). Als Ergebnis erhalten Sie aufgrund redundanter Netzwerk-Hops eine schlechte Leistung.

Leider unterstützt aktuelle KafkaStreamer Implementierungen Stream Tuple Extraktoren nicht (siehe z. B. StreamSingleTupleExtractor Klasse). Sie können aber auch Ihre eigene Kafka-Streamer-Implementierung erstellen, indem Sie die vorhandene Kafka-Streamer-Implementierung als Beispiel verwenden.

Sie können auch Gebrauch KafkaStreamer 's keyDecoder und valDecoder zu extrahieren, um Person' s ID von Kafka Nachricht versuchen. Ich bin mir nicht sicher, aber es kann helfen.

+0

Dank @a_gura für die hilfreiche Antwort! – Tom

Verwandte Themen