Ich versuche, die Anzahl der Partitionen auf 2 aus dem Code, und ich habe Einzelknoten-Setup, (1 zookeeper, 1kafka). Wenn ich die Nachricht konsumiere, sehe ich, dass Kafka nur eine Partition verwendet, um die Daten zu speichern. Muss ich Änderungen am Setup vornehmen, um mehrere Partitionen zu haben?Einrichten mehrerer Partitionen in Apache Kafka
private void setupZookeeper(String[] topicList){
ZkClient zkClient = null;
ZkUtils zkUtils = null;
try {
String[] zookeeperHosts = {"localhost:2181"}; // If multiple zookeeper then -> String zookeeperHosts = "192.168.20.1:2181,192.168.20.2:2181";
int sessionTimeOutInMs = 15 * 1000; // 15 secs
int connectionTimeOutInMs = 10 * 1000; // 10 secs
//String topicName = "testTopic";
int noOfPartitions = 2;
int noOfReplication = 1;
for(String zookeeper:zookeeperHosts){
zkClient = new ZkClient(zookeeper, sessionTimeOutInMs, connectionTimeOutInMs, ZKStringSerializer$.MODULE$);
zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeper), false);
for(String topicName: topicList){
System.out.println("Setting no of partitions ="+noOfPartitions + "for topic" + topicName);
AdminUtils.createTopic(zkUtils, topicName, noOfPartitions, noOfReplication,
producerConfig(),RackAwareMode.Disabled$.MODULE$);
}
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
if (zkClient != null) {
zkClient.close();
}
}
Mein producerConfig, sieht wie folgt aus:
private Properties producerConfig() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
//props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}