2015-10-29 6 views
5

Ich versuche verzweifelt, Cassandra mit pyspark zu verbinden, aber ich kann es nicht zur Arbeit bringen. Ich bin ziemlich neu in Cassandra und Funke, also könnte ich etwas ziemlich Einfaches vermissen.Cassandra mit Spark (pyspark) verbinden/integrieren

Ich bin ein bisschen verwirrt durch die verschiedenen Erklärungen online, aber von dem, was ich verstanden habe, wäre der einfachste Weg, "Spark-Pakete" zu verwenden? (http://spark-packages.org/package/TargetHolding/pyspark-cassandra)

Also, mit dem folgenden Befehl:

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py 

Bin ich richtig in meinem Verständnis, dass ich brauche keine Pakete zum Download, wenn ich Funken Pakete verwenden, wie oben beschrieben?

im myPysparkFile.py habe ich versucht, die folgenden zwei Versionen, von denen keines mir für mich arbeiten:

Version 1, die ich von Seite 14 in http://www.slideshare.net/JonHaddad/intro-to-py-spark-and-cassandra bekam:

"SparkCassandraTest.py" 
from pyspark import SparkContext, SparkConf 
from pyspark_cassandra import CassandraSparkContext,Row 

conf = SparkConf() 
conf.setMaster("local[4]") 
conf.setAppName("Spark Cassandra") 
conf.set("spark.cassandra.connection.host","http://127.0.0.1") 

sc = CassandraSparkContext(conf=conf) 

rdd = sc.cassandraTable("test", "words") 

als Fehler i erhalten:

ImportError: No module named pyspark_cassandra 

Version 2 (die von inspiriert: https://github.com/datastax/spark-cassandra-connector/blob/master/doc/15_python.md):

012.351.
"SparkCassandraTest.py" 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SQLContext 

conf = SparkConf() 
conf.setMaster("local[4]") 
conf.setAppName("Spark Cassandra") 
conf.set("spark.cassandra.connection.host","http://127.0.0.1") 

sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

sqlContext.read\ 
    .format("org.apache.spark.sql.cassandra")\ 
    .options(table="kv", keyspace="test")\ 
    .load().show() 

, die mir die folgende Störung gibt:

py4j.protocol.Py4JJavaError: An error occurred while calling o28.load. 
: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; 
    at org.apache.spark.sql.cassandra.DefaultSource$.<init>(DefaultSource.scala:138) 
    at org.apache.spark.sql.cassandra.DefaultSource$.<clinit>(DefaultSource.scala) 
    at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:56) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

Ich weiß wirklich nicht, was ich falsch mache und würde jede Hilfe dankbar. Was ist der Unterschied zwischen der Verwendung von Version 1 oder Version 2? Gibt es Vor- oder Nachteile zwischen den beiden Versionen?

Auch alle weiteren Referenzen, wie man Funken mit Cassandra am besten integriert und verwendet, würden sehr geschätzt.

Btw, Cassandra läuft auf meinem PC mit den Grundkonfigurationen auf Port 7000.

Dank.

+0

Was ist die Spark-Version – Abhi

Antwort

8

Pyspark_Cassandra ist ein anderes Paket als der Spark-Cassandra-Connector. Es enthält eine Version des SCC, ist jedoch nicht austauschbar. Bei der Installation von SCC wird pyspark_cassandra nicht installiert. Dieses Paket ist erforderlich, wenn Sie sc.cassandraTable() von pyspark verwenden möchten.

Durch die Installation von SCC haben Sie die Möglichkeit, Dataframes in pyspark zu verwenden, was der effizienteste Weg ist, mit C * von pyspark umzugehen. Dies ist das gleiche wie in Ihrem V2-Beispiel. Wenn es fehlschlägt, scheint es, als hätten Sie V2 nicht mit dem Befehl --package gestartet.

Der Grund ist es fehlerhaft sein kann, ist, dass Sie die Scala 2.11 Version der Bibliothek angeben hier

./bin/spark-submit --packages com.datastax.spark:spark-cassandra-connector_2.11:1.5.0-M2 ../Main/Code/myPysparkFile.py 

und werden höchstwahrscheinlich keine Scala 2.10 Version Spark läuft (die Standard-Download ist 2.10)

+1

Sie hatten Recht, mit dem gleichen Befehl mit 2.10 löste das Problem. Danke vielmals. – Kito

+0

kann ich eine Verbindung zu Cassandra 3.3 mit dem gleichen Treiber herstellen @RussS – Abhi

+0

Wissen Sie, dass es möglich ist, eine Tabelle aus dem Cassandra-Anschluss zu erstellen? –

Verwandte Themen