2016-10-20 6 views
1

Ich verwende HighlevelProducer und HighlevelConsumer, um Nachrichten zu senden und zu empfangen. Der HighlevelConsumer ist mit autoCommit = false konfiguriert, da ich Nachrichten nur dann festschreiben möchte, wenn sie erfolgreich erstellt wurden. Das Problem ist, dass die erste Nachricht nie wirklich vergeben wird.Falsche Commit-Reihenfolge bei Verwendung von AutoCommit = false in HighlevelConsumer

Beispiel:

  • senden Nachrichten 1-10.
  • Erhalten Nachricht 1
  • Receive Message 2
  • Commit Nachricht 2
  • ...
  • Nachricht 10
  • Empfangen der Nachricht Commit 10
  • Nachricht Commit 1

Wenn ich Starten Sie meinen Consumer neu, alle Nachrichten von 1 bis 10 werden erneut verarbeitet. Nur wenn ich neue Nachrichten an den Kunden sende, werden die alten Nachrichten festgeschrieben. Dies geschieht für eine beliebige Anzahl von Nachrichten.

My-Code lautet wie folgt:

var kafka = require('kafka-node'), 
    HighLevelConsumer = kafka.HighLevelConsumer, 
    client = new kafka.Client("localhost:2181/"); 
consumer = new HighLevelConsumer(
    client, 
    [ 
     { topic: 'mytopic' } 
    ], 
    { 
     groupId: 'my-group', 
     id: "my-consumer-1", 
     autoCommit: false 
    } 
); 

consumer.on('message', function (message) { 
    console.log("consume: " + message.offset); 
    consumer.commit(function (err, data) { 
     console.log("commited:" + message.offset); 
    }); 
    console.log("consumed:" + message.offset); 
}); 

process.on('SIGINT', function() { 
    consumer.close(true, function() { 
     process.exit(); 
    }); 
}); 

process.on('exit', function() { 
    consumer.close(true, function() { 
     process.exit(); 
    }); 
}); 
var messages = 10; 
var kafka = require('kafka-node'), 
    HighLevelProducer = kafka.HighLevelProducer, 
    client = new kafka.Client("localhost:2181/"); 
var producer = new HighLevelProducer(client, { partitionerType: 2, requireAcks: 1 }); 

producer.on('error', function (err) { console.log(err) }); 
producer.on('ready', function() { 
    for (i = 0; i < messages; i++) { 
     payloads = [{ topic: 'mytopic', messages: "" }]; 
     producer.send(payloads, function (err, data) { 
      err ? console.log(i + "err", err) : console.log(i + "data", data); 
     }); 
    } 
}); 

Bin ich etwas falsch zu machen oder ist das ein Fehler in kafka-Knoten?

+0

Mögliches Duplikat von [Warum commitAsync die ersten 2 Offsets nicht festlegt] (http://stackoverflow.com/questions/37794718/why-commitasasc-fails-to-commit-the-first-2-offsets) –

Antwort

0

Eine Übertragung Nachricht 2 wird eine implizite der Commit-Nachricht 1.

Wie Sie Commits werden asynchron ausgeführt, und Commit Nachricht 1 und Nachricht 2 fertig sind schnell nach einander (dh geschieht begehen 2, bevor die Consumer hat das Commit von 1) gesendet, das erste Commit wird nicht explizit passieren und nur ein Commit von Message 2 wird gesendet.

Verwandte Themen