2017-04-11 5 views
2

Ich versuche, ein Multinomiale logistische Regression ModellSpark-Pipeline Fehler

from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName('prepare_data').getOrCreate() 

from pyspark.sql.types import * 
spark.sql("DROP TABLE IF EXISTS customers") 
spark.sql("CREATE TABLE customers (
      Customer_ID DOUBLE, 
      Name STRING, 
      Gender STRING, 
      Address STRING, 
      Nationality DOUBLE, 
      Account_Type STRING, 
      Age DOUBLE, 
      Education STRING, 
      Employment STRING, 
      Salary DOUBLE, 
      Employer_Stability STRING, 
      Customer_Loyalty DOUBLE, 
      Balance DOUBLE, 
      Residential_Status STRING, 
      Service_Level STRING)") 
spark.sql("LOAD DATA LOCAL INPATH '../datasets/dummyTrain.csv' INTO TABLE 
      customers") 

dataset = spark.table("customers") 
cols = dataset.columns 
display(dataset) 

from pyspark.ml import Pipeline 
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler 

categoricalColumns = ["Education", "Employment", "Employer_Stability", 
         "Residential_Status"] 
stages = [] 

for categoricalCol in categoricalColumns: 
    stringIndexer = StringIndexer(inputCol=categoricalCol, 
     outputCol=categoricalCol+"Index") 
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", 
     outputCol=categoricalCol+"classVec") 
    stages += [stringIndexer, encoder] 

label_stringIdx = StringIndexer(inputCol = "Service_Level", outputCol = 
    "label") 
stages += [label_stringIdx] 

numericCols = ["Age", "Salary", "Customer_Loyalty", "Balance"] 
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + 
    numericCols 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") 
stages += [assembler] 

pipeline = Pipeline(stages=stages) 
pipelineModel = pipeline.fit(dataset) 
dataset = pipelineModel.transform(dataset) 
selectedcols = ["label", "features"] + cols 
dataset = dataset.select(selectedcols) 
display(dataset) 

Ich erhalte die folgende Fehlermeldung auszuführen:

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-31-07d2fb5cecc8> in <module>() 
     4 # - fit() computes feature statistics as needed 
     5 # - transform() actually transforms the features 
----> 6 pipelineModel = pipeline.fit(dataset) 
     7 dataset = pipelineModel.transform(dataset) 
     8 

/srv/spark/python/pyspark/ml/base.py in fit(self, dataset, params) 
    62     return self.copy(params)._fit(dataset) 
    63    else: 
---> 64     return self._fit(dataset) 
    65   else: 
    66    raise ValueError("Params must be either a param map or a 
list/tuple of param maps, " 

/srv/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset) 
    109      transformers.append(model) 
    110      if i < indexOfLastEstimator: 
--> 111       dataset = model.transform(dataset) 
    112    else: 
    113     transformers.append(stage) 

/srv/spark/python/pyspark/ml/base.py in transform(self, dataset, params) 
    103     return self.copy(params)._transform(dataset) 
    104    else: 
--> 105     return self._transform(dataset) 
    106   else: 
    107    raise ValueError("Params must be a param map but got 
%s." % type(params)) 

/srv/spark/python/pyspark/ml/wrapper.py in _transform(self, dataset) 
    250  def _transform(self, dataset): 
    251   self._transfer_params_to_java() 
--> 252   return DataFrame(self._java_obj.transform(dataset._jdf), 
dataset.sql_ctx) 
    253 
    254 

/srv/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
__call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, self.name) 
    1134 
    1135   for temp_arg in temp_args: 

/srv/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 
    61  def deco(*a, **kw): 
    62   try: 
---> 63    return f(*a, **kw) 
    64   except py4j.protocol.Py4JJavaError as e: 
    65    s = e.java_exception.toString() 

/srv/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name) 
    317     raise Py4JJavaError(
    318      "An error occurred while calling {0}{1}{2}.\n". 
--> 319      format(target_id, ".", name), value) 
    320    else: 
    321     raise Py4JError(

Py4JJavaError: An error occurred while calling o798.transform. 
: java.lang.NullPointerException at 

ich es versäumt haben, um herauszufinden, was habe ich falsch gemacht und scheint Das Problem kann auf der transform() -Methode sein. Jede Hilfe wäre willkommen.

+1

Sind Sie sicher, dass Sie keine fehlenden Werte haben? Es gibt eine NullPointerException zurück, die darauf hinweist, dass einer Ihrer Transformer die Daten nicht ordnungsgemäß verarbeiten kann. – TDrabas

+0

@TDrabas das Dataset gab NULL für alle Zeilen zurück. Vielen Dank. Jetzt habe ich meinen Code geändert, eine temporäre Ansicht erstellt und daraus meine Spalten ausgewählt. Wenn ich die Pipeline ausführen und die Transformationen erstellen möchte, bekomme ich jetzt eine 'IllegalArgumentException: u'Data type StringType wird nicht unterstützt. '' Fehler – maffsojah

+1

Kannst du die Debug-Informationen einfügen? Definitiv scheint eines Ihrer Eingabe-Features eine Zeichenkette zu sein, während der 'VectorAssembler' numerische Spalten benötigt. Überprüfen Sie das Schema der Ansicht, die Sie gerade mit 'printSchema()' erstellt haben, um zu sehen, ob die numerischen Spalten nicht irgendwie in Zeichenfolgen umgewandelt wurden. BTW, wenn Sie 'encoder' erstellen, könnten Sie in Erwägung ziehen, die' inputCol' als 'StringIndexer.getOuputCol()' – TDrabas

Antwort

2

Sie müssen sicherstellen, dass keine fehlenden Werte in Ihren Daten vorhanden sind - deshalb erhalten Sie die NullPointerException. Stellen Sie außerdem sicher, dass alle Eingabefunktionen für VectorAssembler numerisch sind.

BTW, wenn Sie Encoder erstellen, können Sie die inputCol als StringIndexer.getOuputCol() angeben.