2016-04-01 15 views
1

Ich möchte Daten mit StandardScaler (from pyspark.mllib.feature import StandardScaler) skalieren, jetzt kann ich es tun, indem Sie die Werte von RDD an die Funktion übergeben, aber das Problem ist, dass ich den Schlüssel erhalten möchte. Gibt es trotzdem, dass ich meine Daten skaliere, indem ich ihren Schlüssel bewahre?Ist es möglich, Daten nach Gruppe in Spark zu skalieren?

Beispieldatensatz

0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal. 
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal. 
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,smurf. 

Imports

import sys 
import os 
from collections import OrderedDict 
from numpy import array 
from math import sqrt 
try: 
    from pyspark import SparkContext, SparkConf 
    from pyspark.mllib.clustering import KMeans 
    from pyspark.mllib.feature import StandardScaler 
    from pyspark.statcounter import StatCounter 

    print ("Successfully imported Spark Modules") 
except ImportError as e: 
    print ("Can not import Spark Modules", e) 
    sys.exit(1) 

Codeabschnitt

sc = SparkContext(conf=conf) 
    raw_data = sc.textFile(data_file) 
    parsed_data = raw_data.map(Parseline) 

Parseline Funktion:

def Parseline(line): 
    line_split = line.split(",") 
    clean_line_split = [line_split[0]]+line_split[4:-1] 
    return (line_split[-1], array([float(x) for x in clean_line_split])) 

Antwort

3

Nicht gerade eine schöne Lösung, aber Sie können meine Antwort auf the similar Scala question anpassen. Fangen wir mit einem Beispiel Daten beginnen:

import numpy as np 

np.random.seed(323) 

keys = ["foo"] * 50 + ["bar"] * 50 
values = (
    np.vstack([np.repeat(-10, 500), np.repeat(10, 500)]).reshape(100, -1) + 
    np.random.rand(100, 10) 
) 

rdd = sc.parallelize(zip(keys, values)) 

Leider MultivariateStatisticalSummary um eine JVM-Modell nur ein Wrapper ist, und es ist nicht wirklich Python freundlich. Zum Glück mit NumPy Array können wir Standard StatCounter verwenden, um Statistiken Schlüssel zu berechnen:

from pyspark.statcounter import StatCounter 

def compute_stats(rdd): 
    return rdd.aggregateByKey(
     StatCounter(), StatCounter.merge, StatCounter.mergeStats 
    ).collectAsMap() 

Endlich können wir map zu normalisieren:

def scale(rdd, stats): 
    def scale_(kv): 
     k, v = kv 
     return (v - stats[k].mean())/stats[k].stdev() 
    return rdd.map(scale_) 

scaled = scale(rdd, compute_stats(rdd)) 
scaled.first() 

## array([ 1.59879188, -1.66816084, 1.38546532, 1.76122047, 1.48132643, 
## 0.01512487, 1.49336769, 0.47765982, -1.04271866, 1.55288814]) 
+0

Wenn ich diesen Code verwenden möchten es mir diese Störung „Typeerror gibt: Unbound-Methode merge() muss mit NoneType-Instanz als erstes -Argument aufgerufen werden (statt StatCounter-Instanz) ", haben Sie eine Idee? – Iman

+0

Fehlende Werte in Ihren Daten? Was sind die Typen? – zero323

+0

die Struktur der Daten ist etwas wie das [Label, Array ([Liste der numerischen Float-Werte]), jedes Etikett ist normal oder Angriff und keine fehlenden Werte – Iman

Verwandte Themen