1

Ich versuche L1 Normalisierung für die Werte einer Spalte in einem Datenrahmen mit pyspark ML Bibliothek. Folgendes ist mein Code. Aber es geht nicht. Können Sie mir bitte helfen herauszufinden, was mit diesem Code nicht stimmt?Normalisieren einer Spalte von Datenrahmen pyspark ML

from pyspark.ml.feature import Normalizer 

y = range(1,10) 
data = spark.createDataFrame([[float(e), ] for e in y]) 
#data.select('_1').show() 

normalizer = Normalizer(p=1.0, inputCol="_1", outputCol="features") 
data2 = normalizer.transform(data) 
data2.select("features").show() 

Folgendes ist ein Teil des Fehlerprotokolls.

Py4JJavaError: An error occurred while calling o857.showString. 
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Task 0 in stage 36.0 failed 4 times, most recent failure: Lost task 0.3 
in stage 36.0 (TID 67, XXXXX.serveraddress.com): 
org.apache.spark.SparkException: Failed to execute user defined 
function($anonfun$createTransformFunc$1: (double) => vector) 

Antwort

1

Normalizer wird verwendet für die Rows zu normalisieren Vectors, nicht Skalare über die Spalten.

Um L1 Skala eine oder mehrere skalare Spalten können Sie versuchen:

data.select([ 
    (data[c]/s).alias(c) 
    for c, s in zip(data.columns, data.groupBy().sum().first()) 
]) 

## +--------------------+ 
## |     _1| 
## +--------------------+ 
## |0.022222222222222223| 
## |0.044444444444444446| 
## | 0.06666666666666667| 
## | 0.08888888888888889| 
## | 0.1111111111111111| 
## | 0.13333333333333333| 
## | 0.15555555555555556| 
## | 0.17777777777777778| 
## |     0.2| 
## +--------------------+ 

aber hüte dich vor möglichen Überlauf und numerische Genauigkeit Probleme.

Sie können natürlich auch eine Column mit Pipeline API skalieren:

from pyspark.ml.feature import SQLTransformer 

stf = SQLTransformer(statement=""" 
    WITH norm AS (SELECT SUM({inputCol}) L1 FROM __THIS__) 
    SELECT /*+ MAPJOIN(norm) +*/ 
     __THIS__.*, __THIS__.{inputCol}/norm.L1 {outputCol} 
    FROM __THIS__ CROSS JOIN norm 
    """.format(inputCol="_1", outputCol="_1_scaled")) 

stf.transform(data) 

## +---+--------------------+ 
## | _1|   _1_scaled| 
## +---+--------------------+ 
## |1.0|0.022222222222222223| 
## |2.0|0.044444444444444446| 
## |3.0| 0.06666666666666667| 
## |4.0| 0.08888888888888889| 
## |5.0| 0.1111111111111111| 
## |6.0| 0.13333333333333333| 
## |7.0| 0.15555555555555556| 
## |8.0| 0.17777777777777778| 
## |9.0|     0.2| 
## +---+--------------------+ 
Verwandte Themen