0

Ich implementiere ein Lambda-Architektur-System für die Stream-Verarbeitung.Spark Streaming: Wie lade ich eine Pipeline in einem Stream?

Ich habe kein Problem eine Pipeline mit Gridsearch in Spark-Batch erstellen:

pipeline = Pipeline(stages=[data1_indexer, data2_indexer, ..., assembler, logistic_regressor]) 

paramGrid = (
ParamGridBuilder() 
.addGrid(logistic_regressor.regParam, (0.01, 0.1)) 
.addGrid(logistic_regressor.tol, (1e-5, 1e-6)) 
...etcetera 
).build() 

cv = CrossValidator(estimator=pipeline, 
       estimatorParamMaps=paramGrid, 
       evaluator=BinaryClassificationEvaluator(), 
       numFolds=4) 

pipeline_cv = cv.fit(raw_train_df) 
model_fitted = pipeline_cv.getEstimator().fit(raw_validation_df) 
model_fitted.write().overwrite().save("pipeline") 

aber ich kann nicht scheinen zu finden, wie die Pipeline in dem Spark-Streaming-Prozess zu stopfen. Ich verwende kafka als DSTREAM Quelle und mein Code ab sofort wie folgt dar:

import json 
from pyspark.ml import PipelineModel 
from pyspark.streaming.kafka import KafkaUtils 

von pyspark.streaming Import Streaming

ssc = StreamingContext(sc, 1) 
kafkaStream = KafkaUtils.createStream(ssc, "localhost:2181", "spark- streaming-consumer", {"kafka_topic": 1}) 

model = PipelineModel.load('pipeline/') 
parsed_stream = kafkaStream.map(lambda x: json.loads(x[1])) 

CODE MISSING GOES HERE  

ssc.start() 
ssc.awaitTermination() 

und jetzt brauche ich irgendwie tun

zu finden

Basierend auf der Dokumentation here (obwohl es sehr, sehr veraltet aussieht) scheint es, dass Ihr Modell die Methode predict implementieren muss, um es auf einem RDD-Objekt (und hoffentlich auf einem Kafkastream?) Zu verwenden.

Wie könnte ich die Pipeline im Streaming-Kontext verwenden? Die neu geladen PipelineModel scheint nur transform

zu implementieren

Ist das den einzigen Weg bedeutet Batch-Modelle in einem Streaming-Kontext zu verwenden, ist reine Modelle zu verwenden, und keine Rohrleitungen?

Antwort

1

Ich habe einen Weg gefunden, eine Spark-Pipeline in Spark-Streaming zu laden.

Diese Lösung funktioniert für Spark 2.0, da weitere Versionen wahrscheinlich eine bessere Lösung implementieren werden.

Die Lösung, die ich gefunden habe, transformiert die Streaming-Daten in Dataframes mit der Methode , in der Sie dann die Methode pipeline.transform anwenden können.

Diese Art der Dinge ist schrecklich ineffizient.

# we load the required libraries 
from pyspark.sql.types import (
     StructType, StringType, StructField, LongType 
     ) 
from pyspark.sql import Row 
from pyspark.streaming.kafka import KafkaUtils 

#we specify the dataframes schema, so spark does not have to do reflections on the data. 

pipeline_schema = StructType(
    [ 
     StructField("field1",StringType(),True), 
     StructField("field2",StringType(),True), 
     StructField("field3", LongType(),True) 
] 
) 

#We load the pipeline saved with spark batch 
pipeline = PipelineModel.load('/pipeline') 

#Setup usual spark context, and spark Streaming Context 
sc = spark.sparkContext 
ssc = StreamingContext(sc, 1) 

#On my case I use kafka directKafkaStream as the DStream source 
directKafkaStream = KafkaUtils.createDirectStream(ssc, [QUEUE_NAME], {"metadata.broker.list": "localhost:9092"}) 

def handler(req_rdd): 
    def process_point(p): 
     #here goes the logic to do after applying the pipeline 
     print(p) 
    if req_rdd.count() > 0: 
     #Here is the gist of it, we turn the rdd into a Row, then into a df with the specified schema) 
     req_df = req_rdd.map(lambda r: Row(**r)).toDF(schema=pipeline_schema) 
     #Now we can apply the transform, yaaay 
     pred = pipeline.transform(req_df) 
     records = pred.rdd.map(lambda p: process_point(p)).collect() 

Hoffe das hilft.

Verwandte Themen