2017-01-18 3 views
3

wir haben ein Problem im Zusammenhang mit der Migration des Codes unserer Anwendung von Version 0.8.2.1 zu 0.9.0.0 von Apache Kafka.Portierungs-App von Kafka 0.8.2.1 nach Kafka 0.9.0. Lesen von Offsets Ausgabe

Wir beziehen uns, in diesem Fall auf die Version von Kafka von Cloudera-Freigabe:

kafka_2.10-0.8.2.0-kafka-1.3.2

kafka_2.11-0.9.0- kafka-2.0.2

Wir haben das Problem beim Lesen und Schreiben der Offsets im Metadaten-Thema __consumer_offsets festgestellt. Insbesondere verwenden wir einen BlockingChannel, um eine Verbindung zum Kafka-Broker herzustellen, und zum Zeitpunkt des Aufrufs der receive() -Methode erhalten wir eine EOFException.

Insbesondere:

java.io.EOFException 
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:83) 
at kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala: 129) 
at kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120) 

Eine mögliche Ursache könnte die Unterschiede zwischen den beiden Versionen von Kafka API sein.

Kafka 0.8.2

in unserer App, wir rufen Methode

ConsumerMetadataResponse.readFrom(channel.receive().buffer()) 

die erhalten ist als

def receive(): Receive = { 
    if(!connected) 
     throw new ClosedChannelException() 

    val response = new BoundedByteBufferReceive() 
    response.readCompletely(readChannel) 

    response 
} 

folgt, wie wir es gibt eine kafka.network sehen .Receive, das ist eine Eigenschaft, die das Merkmal kafka.network.Transmission erweitert. In dieser Erhalten besteht die Puffer-Methode und wird in kafka.network.BoundedByteBufferReceive

def buffer: ByteBuffer = { 
    expectComplete() 
    contentBuffer 
    } 

Kafka 0.9.0

Wir die vorherige Linie

GroupCoordinatorResponse.readFrom(channel.receive().payload()) 

die zu

geändert überschrieben erhalten Methode in dieser Version der API ist wie folgt

def receive(): NetworkReceive = { 
    if(!connected) 
     throw new ClosedChannelException() 

    val response = readCompletely(readChannel) 
    response.payload().rewind() 

    response 
    } 

    private def readCompletely(channel: ReadableByteChannel): NetworkReceive =  { 
    val response = new NetworkReceive 
    while (!response.complete()) 
     response.readFromReadableChannel(channel) 
    response 
    } 

Wie wir sehen können, gibt dies stattdessen eine kafka.network.NetworkReceive zurück, eine Klasse, die die Schnittstelle kafka.network.Receive implementiert, jetzt in Java geschrieben und komplett anders als die vorherige. Hier gibt es keine Pufferverfahren, sondern nur eine Nutzlast Methode, die den Inhalt von

    private ByteBuffer buffer; 

Wie konnten wir lösen zurückkehrt? Vielen Dank im Voraus

Antwort

0

Kafka 0.9 behält den alten Kafka-Konsumenten bei, um Abwärtskompatibilität mit Kafka 0.8.2 Brokern zu erreichen. Sie verwenden den alten Verbraucher, der in Kafka 0.9 noch vorhanden ist, um Nachrichten von Kafka 0.9 zu lesen. Sie sollten die neue Konsumenten-API von Kafka 0.9 verwenden, um Daten von Kafka 0.9-Brokern zu lesen.

Hoffe, das hilft.