2016-06-07 7 views
1

Ich weiß, dass es ziemlich viele node.js Module gibt, die einen Kafka-Verbraucher implementieren, der Nachrichten erhält und in elastisch schreibt. Aber ich brauche nur einige der Felder von jeder msg und nicht alle von ihnen. Gibt es eine bestehende Lösung, von der ich nichts weiß?Kafka zu Elasticsearch Verbrauch mit node.js

+2

Ich machte den gleichen Fehler wie @pickypg und antwortete im Allgemeinen (die Antwort wurde aufgrund des Off-Topic entfernt), aber es würde wahrscheinlich helfen, das "node.js" -Tag hinzuzufügen. – JDP10101

Antwort

2

Die Frage fragt nach einem Beispiel von node.js. Die kafka-node module provides a very nice mechanism for getting a Consumer, die Sie mit dem elasticsearch-js Modul kombinieren:

// configure Elasticsearch client 
var elasticsearch = require('elasticsearch'); 
var esClient = new elasticsearch.Client({ 
    // ... connection details ... 
}); 
// configure Kafka Consumer 
var kafka = require('kafka-node'); 
var Consumer = kafka.Consumer; 
var client = new kafka.Client(); 
var consumer = new Consumer(
    client, 
    [ 
    // ... topics/partitions ... 
    ], 
    { autoCommit: false } 
); 

consumer.on('message', function(message) { 
    if (message.some_special_field === "drop") { 
    return; // skip it 
    } 

    // drop fields (you can use delete message['field1'] syntax if you need 
    // to parse a more dynamic structure) 
    delete message.field1; 
    delete message.field2; 
    delete message.field3; 

    esClient.index({ 
    index: 'index-name', 
    type: 'type-name', 
    id: message.id_field, // ID will be auto generated if none/unset 
    body: message 
    }, function(err, res) { 
    if (err) { 
     throw err; 
    } 
    }); 
}); 

consumer.on('error', function(err) { 
    console.log(err); 
}); 

HINWEIS: Mit dem Index-API ist keine gute Praxis, wenn Sie haben jede Menge Nachrichten über gesendet werden, weil es erfordert, dass Elasticsearch einen Thread erstellen pro Operation, die offensichtlich verschwenderisch ist und zu abgelehnten Anfragen führen wird, wenn der Thread-Pool als Ergebnis erschöpft ist. In jeder Bulk-Aufnahmesituation, eine bessere Lösung ist zu prüfen, mit etwas wie Elasticsearch Streams (oder Elasticsearch Bulk Index Stream, die auf ihm baut), die oben auf dem offiziellen elasticsearch-js-Client baut. Allerdings habe ich nie diese Client-Erweiterungen verwendet, so dass ich nicht wirklich weiß, wie gut sie funktionieren oder nicht, aber die Verwendung würde einfach den Teil ersetzen, auf dem ich die Indizierung vorstelle.

Ich bin nicht davon überzeugt, dass der Ansatz von node.js eigentlich besser ist als der von Logstash, was Wartung und Komplexität betrifft, deshalb habe ich beide hier als Referenz hinterlassen.

Der bessere Ansatz ist wahrscheinlich, Kafka von Logstash zu konsumieren, und dann an Elasticsearch auszuliefern.

Sie können Logstash verwenden, um dies auf einfache Weise mit den Kafka input und Elasticsearch output zu tun.

Jedes Dokument in der Logstash-Pipeline wird als "Ereignis" bezeichnet. Die Kafka-Eingabe geht davon aus, dass sie JSON empfangen wird (konfigurierbar über ihren Codec), der ein einzelnes Ereignis mit allen Feldern dieser Nachricht füllt.

Sie können dann die Felder löschen, an denen Sie kein Interesse haben, oder die gesamte Veranstaltung bedingungsbedingt abbrechen.

input { 
    # Receive from Kafka 
    kafka { 
    # ... 
    } 
} 

filter { 
    if [some_special_field] == "drop" { 
    drop { } # skip the entire event 
    } 

    # drop specific fields 
    mutate { 
    remove_field => [ 
     "field1", "field2", ... 
    ] 
    } 
} 

output { 
    # send to Elasticsearch 
    elasticsearch { 
    # ... 
    } 
} 

Natürlich müssen Sie den Kafka-Eingang konfigurieren (aus dem ersten Link) und den Elasticsearch Ausgang (und den zweiten Link).

+0

Das sieht wie eine gute Antwort aus, aber nicht für Knoten –

+1

Oh, wops. Ich habe "Knoten" wie einen ES-Knoten falsch gelesen. Nicht node.js. :) Mir ist nichts bekannt, was in node.js gleich ist, aber 'kafka-node' macht es ziemlich einfach, was mit dem' elasticsearch-js' Client kombiniert werden kann, um das Gleiche zu tun. – pickypg

+0

Ich füge ein kurzes Beispiel hinzu. – pickypg

Verwandte Themen