3

Wenn ich ein Spark-Modell erstellen und es aufrufen, benötigen die Vorhersagen einige zehn ms, um zurückzukehren. Wenn ich jedoch das gleiche Modell speichere, dann lade es, die Vorhersagen dauern viel länger. Gibt es eine Art Cache, den ich benutzen sollte?So laden Sie ein Spark-Modell für effiziente Vorhersagen

model.cache() nach dem Laden funktioniert nicht, da das Modell keine RDD ist.

Dies funktioniert gut:

from pyspark.mllib.recommendation import ALS 
from pyspark import SparkContext 
import time 

sc = SparkContext() 

# Some example data 
r = [(1, 1, 1.0), 
    (1, 2, 2.0), 
    (2, 1, 2.0)] 
ratings = sc.parallelize(r) 
model = ALS.trainImplicit(ratings, 1, seed=10) 

# Call model and time it 
now = time.time() 
for t in range(10): 
    model.predict(2, 2) 

elapsed = (time.time() - now)*1000/(t+1) 

print "Average time for model call: {:.2f}ms".format(elapsed) 

model.save(sc, 'my_spark_model') 

Ausgang: Average time for model call: 71.18ms

Wenn ich folgendes ausführen, nehmen die Vorhersagen viel mehr Zeit:

from pyspark.mllib.recommendation import MatrixFactorizationModel 
from pyspark import SparkContext 
import time 

sc = SparkContext() 

model_path = "my_spark_model" 
model = MatrixFactorizationModel.load(sc, model_path) 

# Call model and time it 
now = time.time() 
for t in range(10): 
    model.predict(2, 2) 

elapsed = (time.time() - now)*1000/(t+1) 

print "Average time for loaded model call: {:.2f}ms".format(elapsed) 

Der Ausgang: Average time for loaded model call: 180.34ms

Für BIG-Modelle sehe ich Vorhersagezeiten o Ver 10 Sekunden für einen einzelnen Anruf nach dem Laden eines gespeicherten Modells.

Antwort

3

Kurz gesagt: Nein, es scheint nicht etwas zu sein, das das ganze Modell zwischenspeichern könnte, da es keine RDD ist.


Yu können versuchen, cache() zu verwenden, aber Sie können das Modell selbst nicht cachen, weil es nicht ein RDD ist, so versuchen Sie dies:

model.productFeatures().cache() 
model.userFeatures().cache() 

Es zu unpersist() sie empfohlen wird, nachdem Sie don‘ Sie brauchen es nicht, vor allem, wenn Sie wirklich große Datenmengen verarbeiten, da Sie Ihren Job vor Fehlern aufgrund von Speichermangel schützen wollen.

Natürlich könnten Sie persist() statt cache() verwenden; Sie können lesen möchten: What is the difference between cache and persist?


Denken Sie daran, dass Funken macht Transformationenlazily, so, wenn Sie das wirklich passiert nichts Modell laden. Es benötigt eine Aktion, um tatsächliche Arbeit auszulösen (d. H., Wenn Sie wirklich die model verwenden, dann wird Spark versuchen, sie zu laden, was zu einer gewissen Latenz führt, im Gegensatz zu dem Speicher).

Beachten Sie auch, dass cache() ist faul, so dass Sie RDD.count() explizit in den Speicher laden können.


Experimente Ausgabe:

Average time for model call: 1518.83ms 
Average time for loaded model call: 2352.70ms 
Average time for loaded model call with my suggestions: 8886.61ms 

By the way, nach dem Laden des Modells, sollten Sie diese Art von Warnungen erhalten:

16/08/24 00:14:05 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow. 
16/08/24 00:14:05 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow. 

Aber was, wenn ich tun der Zähltrick?Ich werde keine Verstärkung überhaupt bekommen, in der Tat werde ich langsamer:

... 
model.productFeatures().cache() 
model.productFeatures().count() 
model.userFeatures().cache() 
model.userFeatures().count() 
... 

Ausgang:

Average time for loaded model call: 13571.14ms 

Ohne die cache(), halten die count() s, ich habe:

Average time for loaded model call: 9312.01ms 

Wichtiger Hinweis: Das Timing wird in einem realen Cluster durchgeführt, in dem die Knoten wichtigen Aufgaben zugewiesen werden. Daher wurde mein Spielzeugbeispiel möglicherweise während der Experimente abgebrochen. Darüber hinaus können die Kommunikationskosten dominieren.

Also, wenn ich du wäre, würde ich die Experimente auch selbst durchführen.

Abschließend scheint es keinen anderen Mechanismus von Spark beim Zwischenspeichern Ihres Modells zu geben.

+0

Dank @gsamaras. 'cache()' war das erste, was ich ausprobiert habe. Leider können Modelle in PySpark nicht zwischengespeichert werden. Hier ist der Fehler: 'AttributeError: 'MatrixFactorizationModel' Objekt hat kein Attribut 'Cache''' – zbinsd

+0

Sie haben es nicht erwähnt. :/Das passiert, weil das Modell keine RDD ist. cache() funktioniert nur auf RDDs. BTW, nette Frage. Haben Sie 'model.productFeatures(). Cache()' und/oder 'model.userFeatures.cache()'? Da diese beiden gepaarte RDDs sind. – gsamaras

+0

@bzinsd verdammt, dachte ich hatte das bereits behoben! Überprüfen Sie meine aktualisierte Antwort! ;) – gsamaras