wir haben ein Problem im Zusammenhang mit der Migration des Codes unserer Anwendung von Version 0.8.2.1 zu 0.9.0.0 von Apache Kafka.Portierungs-App von Kafka 0.8.2.1 nach Kafka 0.9.0. Lesen von Offsets Ausgabe
Wir beziehen uns, in diesem Fall auf die Version von Kafka von Cloudera-Freigabe:
kafka_2.10-0.8.2.0-kafka-1.3.2
kafka_2.11-0.9.0- kafka-2.0.2
Wir haben das Problem beim Lesen und Schreiben der Offsets im Metadaten-Thema __consumer_offsets festgestellt. Insbesondere verwenden wir einen BlockingChannel, um eine Verbindung zum Kafka-Broker herzustellen, und zum Zeitpunkt des Aufrufs der receive() -Methode erhalten wir eine EOFException.
Insbesondere:
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala: 129)
at kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120)
Eine mögliche Ursache könnte die Unterschiede zwischen den beiden Versionen von Kafka API sein.
Kafka 0.8.2
in unserer App, wir rufen Methode
ConsumerMetadataResponse.readFrom(channel.receive().buffer())
die erhalten ist als
def receive(): Receive = {
if(!connected)
throw new ClosedChannelException()
val response = new BoundedByteBufferReceive()
response.readCompletely(readChannel)
response
}
folgt, wie wir es gibt eine kafka.network sehen .Receive, das ist eine Eigenschaft, die das Merkmal kafka.network.Transmission erweitert. In dieser Erhalten besteht die Puffer-Methode und wird in kafka.network.BoundedByteBufferReceive
def buffer: ByteBuffer = {
expectComplete()
contentBuffer
}
Kafka 0.9.0
Wir die vorherige Linie
GroupCoordinatorResponse.readFrom(channel.receive().payload())
die zu
geändert überschrieben erhalten Methode in dieser Version der API ist wie folgtdef receive(): NetworkReceive = {
if(!connected)
throw new ClosedChannelException()
val response = readCompletely(readChannel)
response.payload().rewind()
response
}
private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
val response = new NetworkReceive
while (!response.complete())
response.readFromReadableChannel(channel)
response
}
Wie wir sehen können, gibt dies stattdessen eine kafka.network.NetworkReceive zurück, eine Klasse, die die Schnittstelle kafka.network.Receive implementiert, jetzt in Java geschrieben und komplett anders als die vorherige. Hier gibt es keine Pufferverfahren, sondern nur eine Nutzlast Methode, die den Inhalt von
private ByteBuffer buffer;
Wie konnten wir lösen zurückkehrt? Vielen Dank im Voraus