2016-05-18 5 views
1

Ich bin neu in Spark MLlib. Ich versuche das StreamingLogisticRegressionWithSGD-Modell zu implementieren. In Spark-Dokumenten sind dafür nur wenige Informationen verfügbar. Wenn ich 2,22-22-22 auf Steckdose Strom eingeben Ich erhalteWie kann ich Nein von Klassifikationsklassen/Labels für StreamingLogisticRegressionWithSGD konfigurieren

ERROR DataValidators: Classification labels should be 0 or 1. Found 1 invalid labels 

Ich verstehe, dass es mir erwartet Merkmale eingeben mit dem Label 0 oder 1, aber ich möchte wirklich wissen, ob ich es für weitere Etiketten konfigurieren können. Ich weiß nicht, wie die Anzahl der Klassen für die Klassifizierung von StreamingLogisticRegressionWithSGD festgelegt wird.

Danke!

-Code

package test; 

import java.util.List; 

import org.apache.spark.SparkConf; 
import org.apache.spark.SparkContext; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.mllib.classification.StreamingLogisticRegressionWithSGD; 
import org.apache.spark.mllib.linalg.Vector; 
import org.apache.spark.mllib.linalg.Vectors; 
import org.apache.spark.mllib.regression.LabeledPoint; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.StreamingContext; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

public class SLRPOC { 

    private static StreamingLogisticRegressionWithSGD slrModel; 

    private static int numFeatures = 3; 

    public static void main(String[] args) { 
     SparkConf sparkConf = new SparkConf().setMaster("local[3]").setAppName("SLRPOC"); 
     SparkContext sc = new SparkContext(sparkConf); 
     StreamingContext ssc = new StreamingContext(sc, Durations.seconds(10)); 
     JavaStreamingContext jssc = new JavaStreamingContext(ssc); 

     slrModel = new StreamingLogisticRegressionWithSGD().setStepSize(0.5).setNumIterations(10).setInitialWeights(Vectors.zeros(numFeatures)); 

     slrModel.trainOn(getDStreamTraining(jssc)); 
     slrModel.predictOn(getDStreamPrediction(jssc)).foreachRDD(new Function<JavaRDD<Double>, Void>() { 

      private static final long serialVersionUID = 5287086933555760190L; 

      @Override 
      public Void call(JavaRDD<Double> v1) throws Exception { 
       List<Double> list = v1.collect(); 
       for (Double d : list) { 
        System.out.println(d); 
       } 
       return null; 
      } 
     }); 

     jssc.start(); 
     jssc.awaitTermination(); 
    } 

    public static JavaDStream<LabeledPoint> getDStreamTraining(JavaStreamingContext context) { 
     JavaReceiverInputDStream<String> lines = context.socketTextStream("localhost", 9998); 

     return lines.map(new Function<String, LabeledPoint>() { 

      private static final long serialVersionUID = 1268686043314386060L; 

      @Override 
      public LabeledPoint call(String data) throws Exception { 
       System.out.println("Inside LabeledPoint call : ----- "); 
       String arr[] = data.split(","); 
       double vc[] = new double[3]; 
       String vcS[] = arr[1].split("-"); 
       int i = 0; 
       for (String vcSi : vcS) { 
        vc[i++] = Double.parseDouble(vcSi); 
       } 
       return new LabeledPoint(Double.parseDouble(arr[0]), Vectors.dense(vc)); 
      } 
     }); 
    } 

    public static JavaDStream<Vector> getDStreamPrediction(JavaStreamingContext context) { 
     JavaReceiverInputDStream<String> lines = context.socketTextStream("localhost", 9999); 

     return lines.map(new Function<String, Vector>() { 

      private static final long serialVersionUID = 1268686043314386060L; 

      @Override 
      public Vector call(String data) throws Exception { 
       System.out.println("Inside Vector call : ----- "); 
       String vcS[] = data.split("-"); 
       double vc[] = new double[3]; 
       int i = 0; 
       for (String vcSi : vcS) { 
        vc[i++] = Double.parseDouble(vcSi); 
       } 
       return Vectors.dense(vc); 
      } 
     }); 
    } 
} 

Exception

Innen LabeledPoint Anruf: ----- 16/05/18 17.51.10 INFO Zieher: Finished Aufgabe 0.0 in Stufe 4.0 (TID 4). 953 Bytes Ergebnis gesendet an Treiber 16.05.18 17:51:10 INFO TaskSetManager: Beendete Task 0.0 in Stufe 4.0 (TID 4) in 8 ms auf localhost (1/1) 16/05/18 17:51: 10 INFO TaskSchedulerImpl: Entfernte TaskSet 4.0, deren Aufgaben alle erledigt haben, vom Pool 16.05.18 17:51:10 INFO DAGScheduler: ResultStage 4 (trainOn at SLRPOC.java:33) beendet in 0,009 s 16/05/18 17:51:10 INFO DAGScheduler: Job 6 beendet: trainOn bei SLRPOC.java:33, genommen 019578 s 16/05/18 17:51:10 FEHLER DataValidators: Klassifizierungsbeschriftungen sollten 0 oder 1 sein. Gefunden 1 ungültige Labels 16/05/18 17:51:10 INFO JobScheduler: Starten von Job-Streaming-Job 1463574070000 ms.1 von Job Zeiteinstellung 1463574070000 ms 16/05/18 17:51:10 ERROR JobScheduler: Fehler beim Ausführen des Job-Streaming-Jobs 1463574070000 ms.0 org.apache.spark.SparkException: Eingabeüberprüfung fehlgeschlagen. bei org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run (GeneralizedLinearAlgorithm.scala: 251) bei org.apache.spark.mllib.regression.StreamingLinearAlgorithm $$ anonfun $ trainOn $ 1.Apply (StreamingLinearAlgorithm.scala: 94 ) bei org.apache.spark.mllib.regression.StreamingLinearAlgorithm $$ anonfun $ trainOn $ 1.Apply (StreamingLinearAlgorithm.scala: 92) bei org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun 1 $ $$ anonfun $ anwenden $ mcV $ sp $ 1.apply $ mcV $ sp (ForEachDStream.scala: 42) um org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ anwenden $ mcV $ sp $ 1. apply (ForEachDStream.scala: 40) um org.apache.spark.streaming.dstream.ForEachDSt Rieses $$ anonfun $ 1 $$ anonfun $ gelten $ MCV $ sp $ 1.Apply (ForEachDStream.scala: 40) bei org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties (DStream.scala: 399) bei org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.Anwendung $ mcV $ sp (ForEachDStream.scala: 40) um org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.Anwendung (ForEachDStream .scala: 40) bei org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.Apply (ForEachDStream.scala: 40) bei scala.util.Try $ .apply (Try.scala: 161) bei org.apache.spark.streaming.scheduler.Job.run (Job.scala: 34) unter org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.appl y $ mcV $ sp (Jobscheduler.scala: 207) bei org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ laufen $ 1.Apply (JobScheduler.scala: 207) bei org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.Anwendung (JobScheduler.scala: 207) bei scala.util.DynamicVariable.withValue (DynamicVariable.scala: 57) um org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler.run (JobScheduler.scala: 206) bei java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145) bei java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:615) auf Java. lang.Thread.run (Thread.java:745) Ausnahme im Thread "main" org.apache.spark.SparkException: Eingabeüberprüfung fehlgeschlagen. bei org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run (GeneralizedLinearAlgorithm.scala: 251) bei org.apache.spark.mllib.regression.StreamingLinearAlgorithm $$ anonfun $ trainOn $ 1.Apply (StreamingLinearAlgorithm.scala: 94 ) bei org.apache.spark.mllib.regression.StreamingLinearAlgorithm $$ anonfun $ trainOn $ 1.Apply (StreamingLinearAlgorithm.scala: 92) bei org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun 1 $ $$ anonfun $ anwenden $ mcV $ sp $ 1.apply $ mcV $ sp (ForEachDStream.scala: 42) um org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1 $$ anonfun $ anwenden $ mcV $ sp $ 1. apply (ForEachDStream.scala: 40) um org.apache.spark.streaming.dstream.ForEachDSt Rieses $$ anonfun $ 1 $$ anonfun $ gelten $ MCV $ sp $ 1.Apply (ForEachDStream.scala: 40) bei org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties (DStream.scala: 399) bei org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.Anwendung $ mcV $ sp (ForEachDStream.scala: 40) um org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.Anwendung (ForEachDStream .scala: 40) bei org.apache.spark.streaming.dstream.ForEachDStream $$ anonfun $ 1.Apply (ForEachDStream.scala: 40) bei scala.util.Try $ .apply (Try.scala: 161) bei org.apache.spark.streaming.scheduler.Job.run (Job.scala: 34) unter org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.appl y $ $ MCV sp (JobScheduler.scala: 207) bei org.apache.spark.streaming.scheduler.JobScheduler $ JobHandler $$ $ anonfun laufen $ 1.Apply (JobScheduler.scala: 207) bei org.apache. spark.streaming.scheduler.JobScheduler $ JobHandler $$ anonfun $ run $ 1.Anwendung (JobScheduler.scala: 207) bei scala.util.DynamicVariable.withValue (DynamicVariable.scala: 57) um org.apache.spark.streaming. scheduler.JobScheduler $ JobHandler.run (JobScheduler.scala: 206) bei java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145) bei java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. Java: 615) bei java.lang.Thread.run (Thread.java:745) 16/05/18 17:51:10 INFO Streaming: Aufrufen stop (stopGracefully = false) vom Abschalten Haken 16/05/18 17.51.10 INFO SparkContext: Start Job: foreachRDD bei SLRPOC.java:34 16/05/18 17.51.10 INFO DAGScheduler : Job 7 beendet: foreachRDD bei SLRPOC.java:34, nahm 0.000020 s 16/05/18 17:51:10 INFO JobScheduler: Abgeschlossener Job Streaming-Job 1463574070000 ms.1 von Job Zeiteinstellung 1463574070000 ms 16.05.18 17:51:10 INFO ReceiverTracker: Gesendetes Stoppsignal an alle 2 Empfänger 16/05/18 17:51:10 INFO ReceiverSupervisorImpl: Empfangenes Stoppsignal 16/05/18 17:51:10 INFO ReceiverSupervisorImpl: Empfänger mit Nachricht stoppen: Gestoppt von Treiber: 16/05/18 17:51:10 INFO ReceiverSupervisorImpl: Called Empfänger onStop 16/05/18 17:51:10 INFO ReceiverSupervisorImpl : Deregistrieren Empfänger 1 16/05/18 17.51.10 INFO ReceiverSupervisorImpl: Empfangene Stoppsignal 16/05/18 17.51.10 INFO ReceiverSupervisorImpl: Stoppen Empfänger mit message: Gestoppt durch Fahrer: 16/05/18 17:51:10 INFO ReceiverSupervisorImpl: Called recept iver onStop 16/05/18 17:51:10 INFO ReceiverSupervisorImpl: Deregistering Empfänger 0 16/05/18 17:51:10 ERROR ReceiverTracker: Deregistrierte Empfänger für Stream 1: Gestoppt von Fahrer 16/05/18 17: INFO ReceiverSupervisorImpl 51:10: Gestoppt Empfänger 1 16/05/18 17.51.10 ERROR ReceiverTracker: abgemeldeten Empfänger für Strom 0: durch den Fahrer gestoppt

Antwort

2

Nicht sicher, ob Sie diese bereits herausgefunden, aber Ihre Verwenden eines binären Algorithmus, der nur 2 Klassifizierungen zulässt, 0 oder 1. Wenn Sie mehr haben möchten, müssen Sie einen multiplen Klassifizierungsalgorithmus verwenden

import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel} 
import org.apache.spark.mllib.evaluation.MulticlassMetrics 
new LogisticRegressionWithLBFGS().setNumClasses(10) 
+0

LogisticRegressionWithLBFGS ist hier nicht die Option, da es Online-Lernen nicht unterstützt, danke für die Pflege auf diesen alten Beitrag zu beantworten. –

Verwandte Themen