Ich versuche, Consumer-Producer-Anwendung zu erstellen.Wie speichere ich Daten in Cassandra Tabelle mit Spark Python?
Der Producer der Anwendung wird einige Daten zu bestimmten Themen produzieren. Consumer wird diese Daten aus dem gleichen Thema konsumieren und mit Spark api verarbeiten und diese Daten in der Cassandra-Tabelle speichern.
Ankommende Daten in Zeichenfolgenformat wie unten kommend -
100 = NO | = 101 III | = 102 ,0771387731911 | = 103 -,7076915761 100 = NO | 101 = AAA | = 102 ,8961325446464 | = 103 -0,5465463154
ich habe Verbraucher in den Balg Weise:
from kafka import KafkaConsumer
from StringIO import StringIO
import pandas as pd
from cassandra.cluster import Cluster
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def main():
sc = SparkContext(appName="StreamingContext")
ssc = StreamingContext(sc, 3)
kafka_stream = KafkaUtils.createStream(ssc, "localhost:2181", "sample-kafka-app", {"NO-topic": 1})
raw = kafka_stream.flatMap(lambda kafkaS: [kafkaS])
clean = raw.map(lambda xs: xs[1].split("|"))
my_row = clean.map(lambda x: {
"pk": "uuid()",
"a": x[0],
"b": x[1],
"c": x[2],
"d": x[3],
})
my_row.saveToCassandra("users", "data")
stream.start()
stream.awaitTermination()
if __name__ == "__main__":
main()
Cassandra Tabellenstruktur -
cqlsh:users> select * from data;
pk | a | b | c | d
----+---+---+---+---
CREATE TABLE users.data (
pk uuid PRIMARY KEY,
a text,
b text,
c text,
d text
)
Ich bin unten Fehler auftreten -
Traceback (most recent call last):
File "consumer_no.py", line 84, in <module>
main()
File "consumer_no.py", line 53, in main
my_row.saveToCassandra("users", "data")
AttributeError: 'TransformedDStream' object has no attribute 'saveToCassandra'
17/04/04 14:29:22 INFO SparkContext: Invoking stop() from shutdown hook
Bin ich auf eine korrekte Art und Weise werde zu erreichen, was ich oben erklären? Wenn nicht, dann gib mir Vorschläge, um das zu erreichen und wenn ja dann, was ist falsch/fehlt im obigen Code?
Mögliche Duplikat von [Speichern von Daten zurück in Cassandra als RDD] (http://stackoverflow.com/questions/35414677/saving-data- back-in-cassandra-as-rdd) –
Mögliches Duplikat von [So speichern Sie Daten in der Cassandra Tabelle mit Sparks saveToCassandra?] (http://stackoverflow.com/questions/43198661/how-to-save-data-in- cassandra-table-using-sparks-savetocassandra) – RussS