2017-06-28 2 views
0

Ich arbeite an einer benutzerdefinierten Kafka-Partitionierungsklasse. Hier versuche ich, die Daten in separate Partitionen zu schieben. Mein Kafka Produzent Klasse:Fehler in meiner benutzerdefinierten Kafka-Partitionierungsklasse

import java.util.Date; 
import java.util.Properties; 
import java.util.Random; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

public class KafkaCustomPartitioner { 
    public static void main(String[] args) { 
     long events = Long.parseLong(args[0]); 
     int blocks = Integer.parseInt(args[1]); 
     Random rnd = new Random(); 

     Properties props = new Properties(); 
     props.put("metadata.broker.list", "localhost:9092"); 
     props.put("serializer.class","kafka.serializer.StringEncoder"); 
     props.put("key.serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("partitioner.class","com.kafka.partdecider.CustomPartitioner"); 
     props.put("producer.type", "sync"); 
     props.put("request.required.acks","1"); 

     ProducerConfig config = new ProducerConfig(props); 
     Producer producer = new Producer(config); 

     for(int nBlocks=0; nBlocks<blocks; nBlocks++) { 
      for(long nEvents=0; nEvents<events; nEvents++) { 
       long runTime = new Date().getTime(); 
       String msg = runTime + ": " + (50+nBlocks) + ": " + nEvents + ": " + rnd; 
       KeyedMessage<String, String> data = new KeyedMessage<String, String>("CustPartTopic",String.valueOf(nBlocks),msg); 
       producer.send(data); 
      } 
     } 
     producer.close(); 
    } 
} 

Kunden Partitioner Klasse:

import kafka.producer.Partitioner; 

public class CustomPartitioner implements Partitioner { 

    public int partition(Object key, int arg1) { 
     String receivingkey = (String) key; 
     long id = Long.parseLong(receivingkey); 
     return (int) (id%arg1); 
    } 
} 

Die Argumente Abschnitt des Projekts hat die Werte: 3 2 I "ArrayOutOfBoundsException" auf dieser Linie bin immer wenn ich die Klasse laufen:

Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0 
    at com.kafka.custompartitioner.KafkaCustomPartitioner.main(KafkaCustomPartitioner.java:13) 

der Fehler wird in der Zeile angezeigt: long events = Long.parseLong(args[0]); Aber ich verstehe nicht, Warum gibt diese Zeile den Fehler? Kann mir jemand sagen, wie ich das beheben kann?

+0

Nachgefragt werden Sie mit Kafka-Version arbeiten, bevor Kafka 0.8.2? Sie verwenden eine sehr alte API. – ppatierno

+0

Ich verwende die Version 0.10 (kafka_2.11-0.10.2.0). Können Sie mir sagen, welche Korrekturen ich für die neueste Version vornehmen muss? Dies ist die Abhängigkeit in meinem pom.xml \t \t org.apache.kafka \t kafka_2.11 \t 0.9.0.0 \t Sidhartha

Antwort

1

Dies funktioniert für mich, sind die API ganz anders aus:

package mypackage.io; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 

import java.util.Date; 
import java.util.Properties; 
import java.util.Random; 
import java.util.concurrent.ExecutionException; 

public class KafkaCustomPartitioner { 

public static void main(String[] args) throws InterruptedException, ExecutionException { 

    long events = Long.parseLong(args[0]); 
    int blocks = Integer.parseInt(args[1]); 
    Random rnd = new Random(); 

    Properties props = new Properties(); 
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "mypackage.io.CustomPartitioner"); 
    props.put(ProducerConfig.ACKS_CONFIG, "1"); 

    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); 

    for(int nBlocks=0; nBlocks<blocks; nBlocks++) { 

     for(long nEvents=0; nEvents<events; nEvents++) { 

      long runTime = new Date().getTime(); 
      String msg = runTime + ": " + (50+nBlocks) + ": " + nEvents + ": " + rnd; 
      producer.send(new ProducerRecord<String, String>("CustPartTopic", String.valueOf(nBlocks), msg)).get(); 
     } 
    } 
    producer.close(); 
    } 
} 

dann die benutzerdefinierte Partitionierungs

package mypackage.io; 

import org.apache.kafka.clients.producer.Partitioner; 
import org.apache.kafka.common.Cluster; 

import java.util.Map; 

public class CustomPartitioner implements Partitioner { 

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 

    String receivingkey = (String) key; 
    long id = Long.parseLong(receivingkey); 
    int numPartitions = cluster.availablePartitionsForTopic(topic).size(); 
    return (int) (id % numPartitions); 
} 

public void close() { 

} 

public void configure(Map<String, ?> map) { 

} 
} 
+0

Es arbeitet jetzt. Und wo übergeben wir hier die Anzahl der Partitionen? – Sidhartha

+0

übergeben Sie es nicht. Sie haben das Thema mit der Anzahl der erstellten Partitionen erstellt. Dann wird das Cluster-Objekt zum Abrufen der Anzahl der Partitionen für dieses Thema verwendet. – ppatierno

+0

Mein schlechtes. Ich meinte wo geben wir den Parameter für die Anzahl der Partitionen an? Das wird meine Programmanforderung beenden. – Sidhartha

Verwandte Themen