2017-09-29 1 views
1

Ich versuche, eine Anwendung mit Apache Kafka, Saprk, Scala und Cassandra zu erstellen. Aber ich habe eine Menge Probleme damit, die richtigen umsetzbaren Versionen dieser Tools zu bekommen.Kafka Spark Scala Cassandra Kompatible Versionen

Kann mir bitte jemand mitteilen, welche Versionen ich verwenden soll?

Vielen Dank im Voraus ..

+0

Bitte fügen Sie, welche Versionen Sie verwenden und wie Sie Ihre 'pom.xml' aussieht. – philantrovert

+0

Ich habe Spark 2.2, Kafka 0.11, scala 2.11 verwendet, aber beim Erstellen der APIs gab es viele Probleme. Deshalb frage ich, von welchen Versionen sollte ich jetzt anfangen? –

Antwort

1

Hier ist die Liste der Versionen für Bibliotheken, die wir verwendet haben:

<dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.1.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.1.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.2.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.2.1</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.cassandra</groupId> 
     <artifactId>apache-cassandra</artifactId> 
     <version>3.10</version> 
    </dependency> 
    <dependency> 
     <groupId>com.datastax.spark</groupId> 
     <artifactId>spark-cassandra-connector_2.11</artifactId> 
     <version>2.0.2</version> 
    </dependency> 

Das Hauptproblem, das Sie mit der Kompatibilität unterscheiden scala Versionen (2.10 Gesicht. * oder 2.11. *). Sie müssen sich darum kümmern und sehen, dass alle Abhängigkeiten dieselbe scala-Version verwenden. Ich denke du kannst ohne Probleme alle Versionen auf den neusten Stand bringen, nur wenn du dich überall um die gleichen Scala-Versionen kümmerst. Hier

ist auch Beispielcode, die Ihnen mit Start helfen:

 public static void main(String[] args) throws InterruptedException { 
      JavaStreamingContext jssc = new JavaStreamingContext(getSparkConfiguration(), Durations.seconds(5)); 

      JavaInputDStream<ConsumerRecord<String, LoggingEvent>> messages = 
      KafkaUtils.createDirectStream(
        jssc, 
        LocationStrategies.PreferConsistent(), 
        ConsumerStrategies.<String, LoggingEvent>Subscribe(Arrays.asList("some_topic"), getKafkaParams("localhost:9092", "some_logging_group)) 
      ); 

      JavaDStream<LoggingEvent> loggingRecords = messages.map(
      (Function<ConsumerRecord<String, LoggingEvent>, LoggingEvent>) message -> message.value() 
    ); 

      CassandraStreamingJavaUtil.javaFunctions(loggingRecords).writerBuilder("some_space", "some_table", 
        CassandraJavaUtil.mapToRow(LoggingEvent.class)).saveToCassandra(); 

      jssc.start(); 
      jssc.awaitTermination(); 
} 

Mapping im Anschluss durch Zuordnen der Felder in der Klasse mit Tabellenspalten erfolgt.

Für die Einrichtung haben wir Ansible verwendet, und die Distributionsversionen für Archive waren dieselben wie in der Bibliotheksabhängigkeitsliste.

+0

scala :-). – mtk

0

Sollten Sie Interesse an der sbt Version

libraryDependencies ++= { 

    val sparkV = "2.1.0" 
    val cassandraV = "2.0.0-M3" 

    Seq(
    "org.apache.spark"  %% "spark-core" % sparkV, 
    "org.apache.spark"  %% "spark-streaming" % sparkV, 
    "org.apache.spark"  %% "spark-streaming-kafka-0-10" % sparkV, 
    "org.apache.spark"  %% "spark-sql-kafka-0-10" % sparkV, 
    "org.apache.spark"  %% "spark-sql" % sparkV, 
    "org.apache.spark"  %% "spark-hive" % sparkV, 
    "com.datastax.spark" %% "spark-cassandra-connector" % cassandraV, 
    "com.datastax.cassandra" % "cassandra-driver-core" % "3.2.0", 
    "org.rogach"   %% "scallop" % "2.1.2" 
) 

} 
+0

Und Scala-Version ist 2.11.8 für die Abhängigkeiten oben. – geo

Verwandte Themen