2016-07-02 6 views
0

Ich habe zwei numpy Matrizen wie folgt aus:Wie passen zwei numpy Matrizen mit PYSPARKS SVM?

Features: 
    (878049, 6) 
    <type 'numpy.ndarray'> 

Labels: 
    (878049,) 
    <type 'numpy.ndarray'> 

Ich war neugierig, wenn ich Pyspark's random forests können die bisherigen genannten Matrizen passen. Aus der Dokumentation, haben wir, dass RF-Algorithmus wie folgt verwendet werden kann:

model = RandomForest.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={}, 
            numTrees=3, featureSubsetStrategy="auto", 
            impurity='gini', maxDepth=4, maxBins=32) 

# Evaluate model on test instances and compute test error 
predictions = model.predict(testData.map(lambda x: x.features)) 

So sind meine Fragen: Muss ich die numpy Arrays zu einem rdd zu transformieren oder in welchem ​​Format soll ich die features konvertieren und labels Matrizen, um sie mit der RF-Implementierung von MLlib?

aktualisieren Dann von @CafeFeed Antwort habe ich versucht, die folgenden:

In [24]: 

#CV 

(trainingData, testData) = data.randomSplit([0.7, 0.3]) 

In [26]: 

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel 

from pyspark.mllib.util import MLUtils 

import numpy as np 

​ 

# Train a DecisionTree model. 

# Empty categoricalFeaturesInfo indicates all features are continuous. 

​ 

model = DecisionTree.trainClassifier(trainingData, numClasses=np.unique(y)) 

​ 

# Evaluate model on test instances and compute test error 

predictions = model.predict(testData.map(lambda x: x.features)) 

labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions) 

testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count()/float(testData.count()) 

print('Test Error = ' + str(testErr)) 

print('Learned classification tree model:') 

print(model.toDebugString()) 

​ 

Aber ich habe diese Ausnahme:

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-27-ded4b074521b> in <module>() 
     6 # Empty categoricalFeaturesInfo indicates all features are continuous. 
     7 
----> 8 model = DecisionTree.trainClassifier(trainingData, numClasses=np.unique(y), categoricalFeaturesInfo={},impurity='gini', maxDepth=5, maxBins=32) 
     9 
    10 # Evaluate model on test instances and compute test error 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/mllib/tree.pyc in trainClassifier(cls, data, numClasses, categoricalFeaturesInfo, impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain) 
    183   """ 
    184   return cls._train(data, "classification", numClasses, categoricalFeaturesInfo, 
--> 185       impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain) 
    186 
    187  @classmethod 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/mllib/tree.pyc in _train(cls, data, type, numClasses, features, impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain) 
    124   assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint" 
    125   model = callMLlibFunc("trainDecisionTreeModel", data, type, numClasses, features, 
--> 126        impurity, maxDepth, maxBins, minInstancesPerNode, minInfoGain) 
    127   return DecisionTreeModel(model) 
    128 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/mllib/common.pyc in callMLlibFunc(name, *args) 
    128  sc = SparkContext._active_spark_context 
    129  api = getattr(sc._jvm.PythonMLLibAPI(), name) 
--> 130  return callJavaFunc(sc, api, *args) 
    131 
    132 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/mllib/common.pyc in callJavaFunc(sc, func, *args) 
    120 def callJavaFunc(sc, func, *args): 
    121  """ Call Java Function """ 
--> 122  args = [_py2java(sc, a) for a in args] 
    123  return _java2py(sc, func(*args)) 
    124 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/mllib/common.pyc in _py2java(sc, obj) 
    86  else: 
    87   data = bytearray(PickleSerializer().dumps(obj)) 
---> 88   obj = sc._jvm.SerDe.loads(data) 
    89  return obj 
    90 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    536   answer = self.gateway_client.send_command(command) 
    537   return_value = get_return_value(answer, self.gateway_client, 
--> 538     self.target_id, self.name) 
    539 
    540   for temp_arg in temp_args: 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/utils.pyc in deco(*a, **kw) 
    34  def deco(*a, **kw): 
    35   try: 
---> 36    return f(*a, **kw) 
    37   except py4j.protocol.Py4JJavaError as e: 
    38    s = e.java_exception.toString() 

/usr/local/Cellar/apache-spark/1.5.1/libexec/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.mllib.api.python.SerDe.loads. 
: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct) 
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) 
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:701) 
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:171) 
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:85) 
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:98) 
    at org.apache.spark.mllib.api.python.SerDe$.loads(PythonMLLibAPI.scala:1462) 
    at org.apache.spark.mllib.api.python.SerDe.loads(PythonMLLibAPI.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

Antwort

2

Docs klar sind. Sie benötigen RDD:

>>> from pyspark.mllib.regression import LabeledPoint 
>>> from pyspark.mllib.tree import RandomForest 
>>> import numpy as np 
>>> 
>>> np.random.seed(1) 
>>> features = np.random.random((100, 10)) 
>>> labels = np.random.choice([0, 1], 100) 
>>> data = sc.parallelize(zip(labels, features)).map(lambda x: LabeledPoint(x[0], x[1])) 
>>> RandomForest.trainClassifier(data, numClasses=2, categoricalFeaturesInfo={}, numTrees=2) 
TreeEnsembleModel classifier with 2 trees 
Verwandte Themen