Ich implementiere ein Lambda-Architektur-System für die Stream-Verarbeitung.Spark Streaming: Wie lade ich eine Pipeline in einem Stream?
Ich habe kein Problem eine Pipeline mit Gridsearch in Spark-Batch erstellen:
pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor])
paramGrid = (
ParamGridBuilder()
.addGrid(logistic_regressor.regParam, (0.01, 0.1))
.addGrid(logistic_regressor.tol, (1e-5, 1e-6))
...etcetera
).build()
cv = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=4)
pipeline_cv = cv.fit(raw_train_df)
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df)
model_fitted.write().overwrite().save("pipeline")
aber ich kann nicht scheinen zu finden, wie die Pipeline in dem Spark-Streaming-Prozess zu stopfen. Ich verwende kafka als DSTREAM Quelle und mein Code ab sofort wie folgt dar:
import json
from pyspark.ml import PipelineModel
from pyspark.streaming.kafka import KafkaUtils
von pyspark.streaming Import Streaming
ssc = StreamingContext(sc, 1)
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "spark- streaming-consumer", {"kafka_topic": 1})
model = PipelineModel.load('pipeline/')
parsed_stream = kafkaStream.map(lambda x: json.loads(x[1]))
CODE MISSING GOES HERE
ssc.start()
ssc.awaitTermination()
und jetzt brauche ich irgendwie tun
zu findenBasierend auf der Dokumentation here (obwohl es sehr, sehr veraltet aussieht) scheint es, dass Ihr Modell die Methode predict implementieren muss, um es auf einem RDD-Objekt (und hoffentlich auf einem Kafkastream?) Zu verwenden.
Wie könnte ich die Pipeline im Streaming-Kontext verwenden? Die neu geladen PipelineModel scheint nur transform
zu implementierenIst das den einzigen Weg bedeutet Batch-Modelle in einem Streaming-Kontext zu verwenden, ist reine Modelle zu verwenden, und keine Rohrleitungen?