2017-02-06 5 views
1

Wir haben eine Pipeline (2.0.1), die aus mehreren Feature Transformation Phasen besteht.Spark: OneHot-Encoder und Speichern von Pipeline (Feature Dimension Problem)

Einige dieser Stufen sind OneHot-Encoder. Idee: Klassifizieren Sie eine Integer-basierte Kategorie in n unabhängige Features.

Wenn Sie das Pipeline-Modell trainieren und es zur Vorhersage aller Funktionen verwenden. Das gespeicherte Pipeline-Modell zu speichern und neu zu laden, verursacht jedoch Probleme:

Der gespeicherte "trainierte" OneHot-Encoder verfolgt nicht, wie viele Kategorien vorhanden sind. Das Laden dieser Funktion führt nun zu Problemen: Wenn das geladene Modell zur Vorhersage verwendet wird, ermittelt es erneut, wie viele Kategorien vorhanden sind, wodurch der Trainings-Feature-Space und der Vorhersage-Feature-Space eine andere Größe (Dimension) aufweisen. Siehe Beispiel-Code unten wie in einem Zeppelin Notebook laufen:

import org.apache.spark.ml.feature.OneHotEncoder 
import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.PipelineModel 


// Specifying two test samples, one with class 5 and one with class 3. This is OneHot encoded into 5 boolean features (sparse vector) 
// Adding a 'filler' column because createDataFrame doesnt like single-column sequences and this is the easiest way to demo it ;) 
val df = spark.createDataFrame(Seq((5, 1), (3, 1))).toDF("class", "filler") 

val enc = new OneHotEncoder() 
    .setInputCol("class") 
    .setOutputCol("class_one_hot") 

val pipeline = new Pipeline() 
    .setStages(Array(enc)) 

val model = pipeline.fit(df) 
model.transform(df).show() 

/* 
+-----+------+-------------+ 
|class|filler|class_one_hot| 
+-----+------+-------------+ 
| 5|  1|(5,[],[]) | 
| 3|  1|(5,[3],[1.0])| 
+-----+------+-------------+ 

Note: Vector of size 5 
*/ 

model.write.overwrite().save("s3a://one-hot") 

val loadedModel = PipelineModel.load("s3a://one-hot") 

val df2 = spark.createDataFrame(Seq((3, 1))).toDF("class", "output") // When using the trained model our input consists of one row (prediction engine style). The provided category for the prediction feature set is category 3 
loadedModel.transform(df2).show() 

/* 
+-----+------+-------------+ 
|class|output|class_one_hot| 
+-----+------+-------------+ 
| 3|  1|(3,[],[]) | 
+-----+------+-------------+ 

Note: Incompatible vector of size 3 
*/ 

ich lieber nicht meinen eigenen OneHot Encoder machen, die diese Serialisierung nicht unterstützt, gibt es Alternativen, die ich aus der Box verwenden kann?

Antwort

1

Sie verwenden OneHotEncoder nicht so, wie es verwendet werden soll. OneHotEncoder ist ein Transofrmer kein Estimator. Es speichert keine Informationen über die Ebenen, sondern hängt von den Column Metadaten ab, um die Ausgabedimensionen zu bestimmen. Wenn Metadaten fehlen, wie in Ihrem Fall, verwendet es Fallback-Strategie und geht davon aus, dass es max(input_column) Ebenen gibt. Die Serialisierung ist hier irrelevant.

Typische Verwendung umfasst Transformers im Upstream Pipeline, die Metadaten für Sie festlegen. Ein übliches Beispiel ist StringIndexer.

Es ist immer noch möglich, Metadaten manuell einstellen, aber es ist mehr beteiligt: ​​

import org.apache.spark.ml.attribute.NominalAttribute 

val meta = NominalAttribute.defaultAttr 
    .withName("class") 
    .withValues("0", (1 to 5).map(_.toString): _*) 
    .toMetadata 

loadedModel.transform(df2.select($"class".as("class", meta), $"output")) 

Ähnlich ist es in Python (Bedürfnisse Spark> = 2.2):

from pyspark.sql.functions import col 

meta = {"ml_attr": { 
    "vals": [str(x) for x in range(6)], # Provide a set of levels 
    "type": "nominal", 
    "name": "class"}} 

loaded.transform(
    df.withColumn("class", col("class").alias("class", metadata=meta)) 
) 

Metadaten können auch angebracht werden unter Verwendung von eine Anzahl verschiedener Methoden: How to change column metadata in pyspark?.

Verwandte Themen