2016-05-12 6 views
0

Ich bekomme einige Einträge aus dem Stream in Linux-Terminal, weisen sie als lines, brechen sie in words. Aber anstatt sie auszudrucken, möchte ich sie in Cassandra speichern. Ich habe einen Keyspace mit dem Namen ks, mit einer Tabelle darin record genannt. Ich weiß, dass ein Code wie CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra(); den Job erledigen muss, aber ich denke, ich mache etwas falsch. Kann jemand helfen?Wie speichere ich Daten aus Spark-Streaming in Cassandra mit Java?

Hier ist meine Cassandra ks.record Schema (ich diese Daten durch CQLSH hinzugefügt)

id | birth_date      | name 
----+---------------------------------+----------- 
10 | 1987-12-01 23:00:00.000000+0000 | Catherine 
11 | 2004-09-07 22:00:00.000000+0000 | Isadora 
1 | 2016-05-10 13:00:04.452000+0000 |  John 
2 | 2016-05-10 13:00:04.452000+0000 |  Troy 
12 | 1970-10-01 23:00:00.000000+0000 |  Anna 
3 | 2016-05-10 13:00:04.452000+0000 | Andrew 

Hier mein Java-Code ist:

import com.datastax.spark.connector.japi.CassandraStreamingJavaUtil; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import scala.Tuple2; 

import java.util.Arrays; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; 
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; 
import static com.datastax.spark.connector.japi.CassandraStreamingJavaUtil.*; 


public class CassandraStreaming2 { 
    public static void main(String[] args) { 

     // Create a local StreamingContext with two working thread and batch interval of 1 second 
     SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CassandraStreaming"); 
     JavaStreamingContext sc = new JavaStreamingContext(conf, Durations.seconds(1)); 

     // Create a DStream that will connect to hostname:port, like localhost:9999 
     JavaReceiverInputDStream<String> lines = sc.socketTextStream("localhost", 9999); 

     // Split each line into words 
     JavaDStream<String> words = lines.flatMap(
       (FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")) 
     ); 

     words.print(); 
     //CassandraStreamingJavaUtil.javaFunctions(words).writerBuilder("ks", "record").saveToCassandra(); 

     sc.start();    // Start the computation 
     sc.awaitTermination(); // Wait for the computation to terminate 

    } 
} 
+0

Welche Fehler erhalten Sie? – RussS

+0

Fehler 'Fehler: (38, 60) java: Methode writerBuilder in Klasse com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions kann nicht auf bestimmte Typen angewendet werden; erforderlich: java.lang.String, java.lang.String, com.datastax.spark.connector.writer.RowWriterFactory gefunden: java.lang.String, java.lang.String Grund: tatsächliche und formale Argumentlisten unterscheiden sich in der Länge für die Zeile 'CassandraStreamingJavaUtil.javaFunctions (words) .writerBuilder (" ks "," record "). saveToCassandra(); ' – Arsinux

Antwort

1

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md#saving-data-to-cassandra

sich nach den docs, Sie müssen auch eine RowWriter-Factory übergeben. Die gebräuchlichste Methode ist die Verwendung der mapToRow(Class) API, dies ist der fehlende Parameter, der beschrieben wird.

Aber Sie haben ein zusätzliches Problem, Ihr Code spezifiziert die Daten noch nicht so, dass sie in C * geschrieben werden können. Sie haben einen JavaDStream von nur String s. Und eine einzelne String kann nicht zu einer Cassandra-Zeile für Ihr gegebenes Schema gemacht werden.

Grundsätzlich erzählen Sie den Stecker

Write "hello" to CassandraTable (id, birthday, value)

Ohne es zu sagen, wo die hello geht (was soll die ID sein? Was sollte der Geburtstag sein?)

+0

Ich konnte das von Ihnen erwähnte" zusätzliche Problem "nicht lösen. Würdest du mir bitte erklären, wie ich das schaffen soll? – Arsinux

+1

Sie fehlen die erforderlichen Felder. Sie müssen sie bereitstellen. In C * müssen Sie immer mindestens den Primärschlüssel angeben. Das bedeutet, dass die Objekte, die Sie in C * schreiben, einen Wert für "id" und "birthday" haben müssen. Dies könnte bedeuten, dass Sie 'Tuple3 (id, timestamp, string)' schreiben oder eine benutzerdefinierte Klasse mit Feldern wie 'RowClass (Integer id; Zeitmarke birthday, String Value; Ganzzahl getId() ...' – RussS

+0

erstellen. Sie haben Recht löste, was Sie erwähnten, das Stück Code, der geholfen hat, war zu ändern 'JavaDStream words = Linien.flatMap ( (FlatMapFunction ) x -> Arrays.asList (x.split (" ")) ); in 'JavaDStream words = linien.flatMap ( (FlatMapFunction ) x -> Arrays.asList (x.split (" ")) ) .map (s -> neue Word (s));.. Word-Klasse war eine Klasse für die Zuordnung der Cassandra-Tabellen zu dem, was ich vom Terminal bekomme – Arsinux

Verwandte Themen