Hier ist die Liste der Versionen für Bibliotheken, die wir verwendet haben:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>apache-cassandra</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.2</version>
</dependency>
Das Hauptproblem, das Sie mit der Kompatibilität unterscheiden scala Versionen (2.10 Gesicht. * oder 2.11. *). Sie müssen sich darum kümmern und sehen, dass alle Abhängigkeiten dieselbe scala-Version verwenden. Ich denke du kannst ohne Probleme alle Versionen auf den neusten Stand bringen, nur wenn du dich überall um die gleichen Scala-Versionen kümmerst. Hier
ist auch Beispielcode, die Ihnen mit Start helfen:
public static void main(String[] args) throws InterruptedException {
JavaStreamingContext jssc = new JavaStreamingContext(getSparkConfiguration(), Durations.seconds(5));
JavaInputDStream<ConsumerRecord<String, LoggingEvent>> messages =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, LoggingEvent>Subscribe(Arrays.asList("some_topic"), getKafkaParams("localhost:9092", "some_logging_group))
);
JavaDStream<LoggingEvent> loggingRecords = messages.map(
(Function<ConsumerRecord<String, LoggingEvent>, LoggingEvent>) message -> message.value()
);
CassandraStreamingJavaUtil.javaFunctions(loggingRecords).writerBuilder("some_space", "some_table",
CassandraJavaUtil.mapToRow(LoggingEvent.class)).saveToCassandra();
jssc.start();
jssc.awaitTermination();
}
Mapping im Anschluss durch Zuordnen der Felder in der Klasse mit Tabellenspalten erfolgt.
Für die Einrichtung haben wir Ansible verwendet, und die Distributionsversionen für Archive waren dieselben wie in der Bibliotheksabhängigkeitsliste.
Bitte fügen Sie, welche Versionen Sie verwenden und wie Sie Ihre 'pom.xml' aussieht. – philantrovert
Ich habe Spark 2.2, Kafka 0.11, scala 2.11 verwendet, aber beim Erstellen der APIs gab es viele Probleme. Deshalb frage ich, von welchen Versionen sollte ich jetzt anfangen? –