2017-08-25 3 views
0

Hallo, ich schreibe einen kleinen Cassandra-Trigger, der nach dem Einfügen in eine bestimmte Tabelle Informationen an kafka sendet. Hier ist meine Trigger-Code:java.lang.NoClassDefFoundError während der Erstellung von Cassandra-Triggern

public class InsertDataTrigger implements ITrigger { 

    public Collection<Mutation> augment(Partition update) { 

     //checking if trigger works and some debug info; 
     SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); 
     System.out.println("Hello " + dateFormat.format(new Date())); 
     System.out.println("This Insert Data Trigger"); 
     System.out.println("default charset " + Charset.defaultCharset());  //IMPORTANT check if it's important 

     //here we're gonna build the message to kafka based on inserted data 
     try { 
      UnfilteredRowIterator it = update.unfilteredIterator(); 
      CFMetaData cfMetaData = update.metadata(); 

      System.out.println("PartitionKey " + new String(update.partitionKey().getKey().array())); 
      System.out.println("update.metadata().clusteringColumns().toString() " + update.metadata().clusteringColumns().toString()); 

      while (it.hasNext()) { 
       JSONObject message = new JSONObject(); 

       Unfiltered un = it.next(); 
       Clustering clt = (Clustering) un.clustering(); 

       message.put("partitionkey", new String(update.partitionKey().getKey().array())); 

       System.out.println("clt.toString(cfMetaData) " + clt.toString(cfMetaData)); 
       System.out.println("clt.getRawValues() " + new String(clt.getRawValues()[0].array())); 
       System.out.println("partition.columns().toString() " + update.columns().toString()); 

       message.put("datetime", new String(clt.getRawValues()[0].array())); 

       Iterator<Cell> cells = update.getRow(clt).cells().iterator(); 

       while (cells.hasNext()) { 
        Cell cell = cells.next(); 
        System.out.println("cell.column().name.toString() " + cell.column().name.toString()); 
        System.out.println("cell.toString()" + cell.toString()); 
        Double x = cell.value().getDouble(); 
        System.out.println("cell.value().getDouble() " + x); 
        //if(cell.column().name.toString() == "value") 
        System.out.println(x); 
        message.put(cell.column().name.toString(), x); 
        //else 
        // message.put(cell.column().name.toString(),cell.value().toString()); 
       } 
       System.out.println("un.toString()" + un.toString(cfMetaData)); 

       if (!message.isEmpty()) { 
        System.out.println(message.toString()); 

        //Sending data to kafka 
        Properties props = new Properties(); 
        props.put("bootstrap.servers", "localhost:9092"); 
        props.put("acks", "all"); 
        props.put("retries", 0); 
        props.put("batch.size", 16384); 
        props.put("linger.ms", 1); 
        props.put("buffer.memory", 33554432); 
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

        Producer<String, String> producer = new KafkaProducer<>(props); 
        producer.send(new ProducerRecord<>("test", message.toString()));//move topic name to some properties 
        producer.close(); 
       } 


      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     return Collections.emptyList(); 
    } } 

Und hier ist meine pom-Datei:

<?xml version="1.0" encoding="UTF-8"?> 
    <project xmlns="http://maven.apache.org/POM/4.0.0" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 

    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.1</version> 
       <configuration> 
        <source>1.8</source> 
        <target>1.8</target> 
       </configuration> 
      </plugin> 
     </plugins> 
    </build> 

    <modelVersion>4.0.0</modelVersion> 

    <groupId>io.github.carldata</groupId> 
    <artifactId>InsertDataTrigger</artifactId> 
    <version>1.0</version> 

    <dependencies> 
     <!-- https://mvnrepository.com/artifact/org.apache.cassandra/cassandra-all --> 
     <dependency> 
      <groupId>org.apache.cassandra</groupId> 
      <artifactId>cassandra-all</artifactId> 
      <version>3.11.0</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.11.0.0</version> 
     </dependency> 
    </dependencies> 

</project> 

Das Projekt baut fein und schafft eine JAR-Datei, aber wenn ich versuche Auslöser zu schaffen, in cassandra schlägt es mit oben genannten Ausnahme .

Antwort

2

Höchstwahrscheinlich befindet sich das kafka-clients jar nicht im Cassandra lib Verzeichnis. Es sei denn, Ihr Projekt enthält diese Abhängigkeit (z. B. ein Fat/Uber-Glas).

Sie Mai haben Probleme mit Abhängigkeitskonflikte in der Kafka-Clients Jar und die Cassandra Abhängigkeiten. Insbesondere org.xerial.snappy snappy-java haben unterschiedliche Versionen. Es kann funktionieren, aber es ist etwas, auf das Sie achten sollten. Wenn es ein Problem ist, können Sie Ihre eigenen Kafka-Clients mit ihren Abhängigkeiten schattieren, damit sie nicht in Konflikt geraten.

+0

Vielen Dank Ich habe Uber Glas erstellt und es löste das Problem. – CodeDog

Verwandte Themen