2017-04-04 6 views
1

Ich versuche, Consumer-Producer-Anwendung zu erstellen.Wie speichere ich Daten in Cassandra Tabelle mit Spark Python?

Der Producer der Anwendung wird einige Daten zu bestimmten Themen produzieren. Consumer wird diese Daten aus dem gleichen Thema konsumieren und mit Spark api verarbeiten und diese Daten in der Cassandra-Tabelle speichern.

Ankommende Daten in Zeichenfolgenformat wie unten kommend -

100 = NO | = 101 III | = 102 ,0771387731911 | = 103 -,7076915761 100 = NO | 101 = AAA | = 102 ,8961325446464 | = 103 -0,5465463154

ich habe Verbraucher in den Balg Weise:

from kafka import KafkaConsumer 
from StringIO import StringIO 
import pandas as pd 
from cassandra.cluster import Cluster 

from pyspark import SparkConf, SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

def main(): 

    sc = SparkContext(appName="StreamingContext") 
    ssc = StreamingContext(sc, 3) 

    kafka_stream = KafkaUtils.createStream(ssc, "localhost:2181", "sample-kafka-app", {"NO-topic": 1}) 
    raw = kafka_stream.flatMap(lambda kafkaS: [kafkaS]) 
    clean = raw.map(lambda xs: xs[1].split("|")) 
    my_row = clean.map(lambda x: { 
     "pk": "uuid()", 
     "a": x[0], 
     "b": x[1], 
     "c": x[2], 
     "d": x[3], 
    }) 

    my_row.saveToCassandra("users", "data") 
    stream.start() 
    stream.awaitTermination() 

if __name__ == "__main__": 
    main() 

Cassandra Tabellenstruktur -

cqlsh:users> select * from data; 

pk | a | b | c | d 
----+---+---+---+--- 
CREATE TABLE users.data (
    pk uuid PRIMARY KEY, 
    a text, 
    b text, 
    c text, 
    d text 
) 

Ich bin unten Fehler auftreten -

Traceback (most recent call last): 


File "consumer_no.py", line 84, in <module> 
    main() 
    File "consumer_no.py", line 53, in main 
    my_row.saveToCassandra("users", "data") 
AttributeError: 'TransformedDStream' object has no attribute 'saveToCassandra' 
17/04/04 14:29:22 INFO SparkContext: Invoking stop() from shutdown hook 

Bin ich auf eine korrekte Art und Weise werde zu erreichen, was ich oben erklären? Wenn nicht, dann gib mir Vorschläge, um das zu erreichen und wenn ja dann, was ist falsch/fehlt im obigen Code?

+0

Mögliche Duplikat von [Speichern von Daten zurück in Cassandra als RDD] (http://stackoverflow.com/questions/35414677/saving-data- back-in-cassandra-as-rdd) –

+0

Mögliches Duplikat von [So speichern Sie Daten in der Cassandra Tabelle mit Sparks saveToCassandra?] (http://stackoverflow.com/questions/43198661/how-to-save-data-in- cassandra-table-using-sparks-savetocassandra) – RussS

Antwort

0

Anstatt direkt zu versuchen, Ihren TransformedDstream in Cassandra zu speichern, sollten Sie jede RDD aus diesem DStream in Cassandra speichern.

Code sollte funktionieren, wenn Sie so etwas tun:

my_row.foreachRDD(lambda x: x.saveToCassandra("users", "data")) 
Verwandte Themen