Ich bekomme einige Einträge aus dem Stream in Linux-Terminal, weisen sie als lines
, brechen sie in words
. Aber anstatt sie auszudrucken, möchte ich sie in Cassandra speichern. Ich habe einen Keyspace mit dem Namen ks
, mit einer Tabelle darin record
genannt. Ich weiß, dass ein Code wie CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra();
den Job erledigen muss, aber ich denke, ich mache etwas falsch. Kann jemand helfen?Wie speichere ich Daten aus Spark-Streaming in Cassandra mit Java?
Hier ist meine Cassandra ks.record Schema (ich diese Daten durch CQLSH hinzugefügt)
id | birth_date | name
----+---------------------------------+-----------
10 | 1987-12-01 23:00:00.000000+0000 | Catherine
11 | 2004-09-07 22:00:00.000000+0000 | Isadora
1 | 2016-05-10 13:00:04.452000+0000 | John
2 | 2016-05-10 13:00:04.452000+0000 | Troy
12 | 1970-10-01 23:00:00.000000+0000 | Anna
3 | 2016-05-10 13:00:04.452000+0000 | Andrew
Hier mein Java-Code ist:
import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import java.util.Arrays;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*;
public class CassandraStreaming2 {
public static void main(String[] args) {
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CassandraStreaming");
JavaStreamingContext sc = new JavaStreamingContext(conf, Durations.seconds(1));
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = sc.socketTextStream("localhost", 9999);
// Split each line into words
JavaDStream<String> words = lines.flatMap(
(FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" "))
);
words.print();
//CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra();
sc.start(); // Start the computation
sc.awaitTermination(); // Wait for the computation to terminate
}
}
Welche Fehler erhalten Sie? – RussS
Fehler 'Fehler: (38, 60) java: Methode writerBuilder in Klasse com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions kann nicht auf bestimmte Typen angewendet werden; erforderlich: java.lang.String, java.lang.String, com.datastax.spark.connector.writer.RowWriterFactory gefunden: java.lang.String, java.lang.String Grund: tatsächliche und formale Argumentlisten unterscheiden sich in der Länge für die Zeile 'CassandraStreamingJavaUtil.javaFunctions (words) .writerBuilder (" ks "," record "). saveToCassandra(); ' –
Arsinux