2017-09-28 2 views
0

Ein Teil meines Codes löst die Ausnahme EOF aus, wenn der Kafka-Broker SSL mit Port 9093 abhört, im Klartext-Listener funktioniert das Code-Snippet einwandfrei.EOF-Ausnahme beim Lesen von BlockingChannel auf Kafka unter SSL/TLS Listener

Irgendwelche Ideen, was könnte hier falsch sein ??

 public KafkaMetadataHelper(String kafkaConnect) throws Exception { 
    // use lowlevel kafka.api to query consumer group metadata (ie max committed offset) 
    String[] hostAndPort = kafkaConnect.split(":"); 
    String host = hostAndPort[0]; 
    int port = Integer.parseInt(hostAndPort[1]); 
    channel = new BlockingChannel(host, port, 
            BlockingChannel.UseDefaultBufferSize(), 
            BlockingChannel.UseDefaultBufferSize(), 
            10000); 
    channel.connect(); 
    GroupCoordinatorRequest request = new GroupCoordinatorRequest(MY_GROUP, 
                    GroupCoordinatorRequest.CurrentVersion(), 
                    correlationId++, 
                    MY_CLIENTID); 
    channel.send(request); 
    GroupCoordinatorResponse metadataResponse = null; 
    try { 
     metadataResponse = GroupCoordinatorResponse.readFrom(channel.receive().payload());// This is where the exception is thrown 

    } catch (Exception e) { 
     e.printStackTrace(); 
    } 

}

Die Fehlermeldung, die ich bekommen habe, ist dieses.

java.io.EOFException 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:103) 
    at kafka.network.BlockingChannel.readCompletely(BlockingChannel.scala:131) 
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:122) 

Antwort

0

Um eine Verbindung über TLS herzustellen, benötigt Ihr Client einige Einstellungen! BlockingChannel ermöglicht dem Anrufer keine Einstellungen vorzunehmen.

Ich schlage vor, Sie sehen ConsumerGroupCommand.scala [1] und sehen, wie es AdminClient [2] verwendet, um Details über Verbrauchergruppen abzurufen.

  1. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L496
  2. https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/AdminClient.scala#L197
+0

Dank Ich werde das Studium und meine Eingaben teilen ... –

Verwandte Themen