2016-07-17 3 views
0

Ich versuche, zwei Kafka Stream-DSL KTable mit beitreten:zwei Kafka K-Tabelle führt zu einer Nullpointer in RocksDB Joining

KTable<String, String> source = builder.table("stream-source"); 
KTable<String, String> target = builder.table("stream-target"); 
source.join(target, new ValueJoiner<String, String, String>() { 
    public String apply(String value1, String value2) { 
     return value1 + ":" + value2; 
    } 
}); 

ich sicher gemacht haben, dass sowohl die Schlüssel und die Werte sind nicht null:

Producer<String, String> producer = new KafkaProducer<String, String>(props); 
for(int i = 0; i < PERSONS_SOURCE.length; i++) { 
    producer.send(new ProducerRecord<String, String>("stream-source",  Long.toString(i + 1L), PERSONS_SOURCE[i])); 
} 
for(int i = 0; i < PERSONS_TARGET.length; i++) { 
    producer.send(new ProducerRecord<String, String>("stream-target", Long.toString(i + 1L), PERSONS_TARGET[i])); 
} 
producer.close(); 

Die Anwendung meldet jedoch, dass in der RocksDB-Schicht ein Nullzeiger für die Partition vorhanden ist.

[2016.07.17 21: 58: 04.682] Fehler Benutzer bereitgestellt Zuhörer org.apache.kafka.streams.processor.internals.StreamThread $ 1 für die Gruppe-Ströme Personen2 fehlgeschlagen auf Partitionszuweisung (org.apache. kafka.clients.consumer.internals.ConsumerCoordinator) java.lang.NullPointerException bei org.rocksdb.RocksDB.put (RocksDB.java:432) bei org.apache.kafka.streams.state.internals.RocksDBStore.putInternal (RocksDBStore.java:299) bei org.apache.kafka.streams.state.internals.RocksDBStore.access $ 200 (RocksDBStore.java:62) bei org.apache.kafka.streams.state.internals.RocksDBStore $ 3.Rest (RocksDBStore.java:206) bei org.apache.kafka.streams.processor .internals.ProcessorStateManager.restoreActiveState (ProcessorStateManager.java:245) bei org.apache.kafka.streams.processor.internals.ProcessorStateManager.register (ProcessorStateManager.java:210) bei org.apache.kafka.streams.processor.internals .ProcessorContextImpl.register (ProcessorContextImpl.java:116) bei org.apache.kafka.streams.state.internals.RocksDBStore.init (RocksDBStore.java:202)

Antwort

1

gefunden das Problem zurückzuführen auf die Ströme zu sein erstellt im Anwendungscode anstelle des Befehls: -

kafka-topics --create --topic stream-a --replication-factor 1 --partitions 1 

Scheint, dass die Join erfordert, dass die Partitionsinformationen funktionieren.

+0

Sie könnten Ihre eigene Antwort akzeptieren :) –

Verwandte Themen