2016-11-25 2 views
0

ich eine RDD (test_rdd) haben, wie untenPyspark speichern RDD zu Cassandra

[ { 'user_lname': u'TEst1' , 'user_id': u'2aa8ae30-c0e5-48bb-AB16-a2ed2e78c8c3' , 'user_phone': u'1234567890 ',' benutzer_name ': u'TestingTesting2', 'betrag': 1222, 'event_timestamp': u'2016-09-29T07: 49: 50.866 + 00: 00 '},

{'user_lname': u'TEst2 ',' Benutzer-ID ': u'2aa8ae30-c0e5-48bb-ac16-a2ed2e78c8c3', 'Benutzer-Telefon': u'1234567891 ',' Benutzername ': u'TestingTesting', 'Betrag': 12 , 'event_timestamp': u'2016-10-27T07: 49: 50.866 + 00: 00 '},

{' Benutzer_Name ': u'TEst3', 'u ser_id ': u'2aa8ae30-c1e5-48bb-ab16-a2ed2e78c8c3', 'benutzer_phone': u'1234567892 ',' benutzer_name ': u'TestingTesting3', 'betrag': 122, 'event_timestamp': u'2016-09- 27T07: 49: 50.866 + 00: 00 '} ]

Ich möchte die obige RDD in einer Cassandra-Tabelle speichern.
Ich erhalte die unten Fehlermeldung, wenn ich
test_rdd.saveToCassandra ("keyspace1", "Tabelle1")

Traceback (jüngste Aufforderung zuletzt) ​​verwenden:
File „/var/spark/test/k.py “, Linie 179, in
parsed_data.saveToCassandra ("keyspace1", "Tabelle 1")
Attribute: 'PipelinedRDD' Objekt hat kein Attribut 'saveToCassandra'

Antwort

0

Entweder

oder

  • folgen Sie den Anweisungen für die offizielle spark-cassandra-connector
  • konvertieren DataFrame (toDF)
  • Schreib Dataframe

    df.write.format("org.apache.spark.sql.cassandra").options(
        table=table, keyspace=keyspace 
    ).save() 
    
+1

Vielen Dank. Ich habe die 2. Methode benutzt. Was ist der Unterschied zwischen Pyspark-Cassandra und Spark-Cassandra-Connector. –