2017-10-25 1 views
1

Ich versuche, Abfrage in Spark mit der Sprache scala zu erstellen, die Daten sind in der Cassandra-Datenbank als Tabelle verfügbar. In Cassandra Tabelle habe ich zwei Schlüssel, 1) Primärschlüssel 2) VerteilungsschlüsselPartition Schlüsselprädikat muss alle Partition Schlüsselspalten enthalten

Cassandra DDL so etwas wie dieses wird:

CREATE TABLE A.B (
    id1 text, 
    id2 text, 
    timing timestamp, 
    value float, 
    PRIMARY KEY ((id1, id2), timing) 
) WITH CLUSTERING ORDER BY (timing DESC) 

Mein Funken Programmierung:

val conf = new SparkConf(true).set("spark.cassandra.connection.host","192.168.xx.xxx").set("spark.cassandra.auth.username","test").set("spark.cassandra.auth.password","test") 
val sc = new SparkContext(conf) 
var ctable = sc.cassandraTable("A", "B").select("id1","id2","timing","value").where("id1=?","1001") 

Wenn ich abfragen das gleiche für "Wert" Ich erhalte das Ergebnis, aber wenn ich nach ID1 oder ID2 abfrage, erhalte ich einen Fehler.

Fehler erhalten: java.lang.UnsupportedOperationException: Das Partitionsschlüsselprädikat muss alle Partitionsschlüsselspalten oder Partitionsschlüsselspalten enthalten, die indiziert werden müssen. Fehlende Spalten: ID2

Ich benutze funke-2.2.0-bin-hadoop2.7, Cassandra 3.9, scala 2.11.8.

Vielen Dank im Voraus.

Antwort

0

Der gewünschte Ausgang wurde mit folgendem Programm ermittelt.

val conf = new SparkConf(true).set("spark.cassandra.connection.host","192.168.xx.xxx").set("spark.cassandra.auth.username","test").set("spark.cassandra.auth.password","test") 
val sc = new SparkContext(conf) 
var ctable = sc.cassandraTable("A", "B").select("id1","id2","timing","value").where("id1=?","1001").where("id2=?","1002") 

So können wir auf den Partitionsschlüssel in der Cassandra-Datenbank über Spark zugreifen.