Ich versuche Kafka-Daten zu Funken-Streaming ziehen, Laden Sie ein bereits gebautes Modell von HDFS und dann Vorhersagen mit Kafka-Nachricht.Pyspark prognostiziert mit kafka direct-Stream
ich verschiedene Methoden ausprobiert, aber ich bin wegen eines Typeerror bei model.predict stecken: Can not Typ in Vektor konvertieren
Die Daten aus kafka empfangenen Komma getrennt schweben.
Hier ist mein Code:
sc = SparkContext(appName="PythonStreamingKafkaForecast")
ssc = StreamingContext(sc, 10)
# Create stream to get kafka messages
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["my_topic"], {"metadata.broker.list": "kafka_ip"})
features = directKafkaStream.foreachRDD(lambda rdd: rdd.map(lambda s: Vectors.dense(s[1].split(","))))
model = LinearRegressionModel.load(sc, "hdfs://hadoop_ip/model.model")
#Predict
predicted = model.predict(features)
Ich habe auch versucht diese:
lines = directKafkaStream.map(lambda x: x[1])
features = lines.map(lambda data: Vectors.dense([float(c) for c in data.split(',')]))
Aber diesmal bietet vom Typ TransformedStream die für preidctions wird nicht funktionieren ...
Können Sie mir sagen, was ich falsch mache?
Vielen Dank für Ihre Hilfe