Ich möchte Daten mit StandardScaler
(from pyspark.mllib.feature import StandardScaler
) skalieren, jetzt kann ich es tun, indem Sie die Werte von RDD an die Funktion übergeben, aber das Problem ist, dass ich den Schlüssel erhalten möchte. Gibt es trotzdem, dass ich meine Daten skaliere, indem ich ihren Schlüssel bewahre?Ist es möglich, Daten nach Gruppe in Spark zu skalieren?
Beispieldatensatz
0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.
0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,smurf.
Imports
import sys
import os
from collections import OrderedDict
from numpy import array
from math import sqrt
try:
from pyspark import SparkContext, SparkConf
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.feature import StandardScaler
from pyspark.statcounter import StatCounter
print ("Successfully imported Spark Modules")
except ImportError as e:
print ("Can not import Spark Modules", e)
sys.exit(1)
Codeabschnitt
sc = SparkContext(conf=conf)
raw_data = sc.textFile(data_file)
parsed_data = raw_data.map(Parseline)
Parseline
Funktion:
def Parseline(line):
line_split = line.split(",")
clean_line_split = [line_split[0]]+line_split[4:-1]
return (line_split[-1], array([float(x) for x in clean_line_split]))
Wenn ich diesen Code verwenden möchten es mir diese Störung „Typeerror gibt: Unbound-Methode merge() muss mit NoneType-Instanz als erstes -Argument aufgerufen werden (statt StatCounter-Instanz) ", haben Sie eine Idee? – Iman
Fehlende Werte in Ihren Daten? Was sind die Typen? – zero323
die Struktur der Daten ist etwas wie das [Label, Array ([Liste der numerischen Float-Werte]), jedes Etikett ist normal oder Angriff und keine fehlenden Werte – Iman