2016-03-20 4 views
0

In meinem Code habe ich mehrere Berechnungen, die ich auf drei verschiedenen Spalten separat ausführen, um die Varianz/Std/Mittelwert usw. zu berechnen ..... Das Problem ist, dass dies fair läuft lange, weil die Werte neu zugeordnet werden müssen und dann die Varianz für jede Spalte berechnet werden muss.Async Spark RDD Berechnung für Varianz/Std/Mittelwert

Ist es möglich, alle drei dieser Anweisungen asynchron gleichzeitig auszuführen und den endgültigen Wert in den 3 im Beispiel angegebenen Variablen abzurufen?

Code unten:

 final Double varSHOUR    = dataset.mapToDouble(new DoubleFunction<modelEhealth>() { 
     @Override 
     public double call(modelEhealth modelEhealth) throws Exception { 
      return modelEhealth.getSHOUR(); 
     } 
    }).variance(); 
    final Double varHOURLYFRAMESIN  = dataset.mapToDouble(new DoubleFunction<modelEhealth>() { 
     @Override 
     public double call(modelEhealth modelEhealth) throws Exception { 
      return modelEhealth.getHOURLYFRAMESIN(); 
     } 
    }).variance(); 
    final Double varHOURLYFRAMESOUT  = dataset.mapToDouble(new DoubleFunction<modelEhealth>() { 
     @Override 
     public double call(modelEhealth modelEhealth) throws Exception { 
      return modelEhealth.getHOURLYFRAMESOUT(); 
     } 
    }).variance(); 

Antwort

1

Du musst Spark Implementierung von JavaDoubleRDD.variance() mit Ihrem ModelHealth Klasse statt Doppel imitieren. Dies ist nicht zu schwer, da Sie Sparks StatCounter verwenden können, um die tatsächlichen Berechnungen durchzuführen, Sie werden nur 3 von ihnen benötigen.

Zum Beispiel werde ich eine einfache ModelHealth mit 3 Double Felder v1 verwenden, v2, v3:

static class ModelHealth { 
    final Double v1; 
    final Double v2; 
    final Double v3; 
} 

Dann:

JavaRDD<ModelHealth> dataset = // your data 

// zero value - three empty StatCounters: 
final Tuple3<StatCounter, StatCounter, StatCounter> zeroValue = new Tuple3<>(new StatCounter(), new StatCounter(), new StatCounter()); 

// using `aggregate` to aggregate ModelHealth records into three StatCounters: 
final Tuple3<StatCounter, StatCounter, StatCounter> stats = dataset.aggregate(zeroValue, new Function2<Tuple3<StatCounter, StatCounter, StatCounter>, ModelHealth, Tuple3<StatCounter, StatCounter, StatCounter>>() { 
    @Override 
    public Tuple3<StatCounter, StatCounter, StatCounter> call(Tuple3<StatCounter, StatCounter, StatCounter> stats, ModelHealth record) throws Exception { 
     // merging record into tuple of StatCounters - each value merged with corresponding counter 
     stats._1().merge(record.v1); 
     stats._2().merge(record.v2); 
     stats._3().merge(record.v3); 
     return stats; 
    } 
}, new Function2<Tuple3<StatCounter, StatCounter, StatCounter>, Tuple3<StatCounter, StatCounter, StatCounter>, Tuple3<StatCounter, StatCounter, StatCounter>>() { 
    @Override 
    public Tuple3<StatCounter, StatCounter, StatCounter> call(Tuple3<StatCounter, StatCounter, StatCounter> v1, Tuple3<StatCounter, StatCounter, StatCounter> v2) throws Exception { 
     // merging tuples of StatCounters - each counter merged with corresponding one 
     v1._1().merge(v2._1()); 
     v1._2().merge(v2._2()); 
     v1._3().merge(v2._3()); 
     return v1; 
    } 
}); 

Double v1_variance = stats._1().variance(); 
Double v2_variance = stats._2().variance(); 
Double v3_variance = stats._3().variance(); 

Dies das gleiche Ergebnis haben Sie gehabt, aber mit einer einzigen Aggregation über den Datensatz.

+0

Wenn Sie versuchen, Function2 für die Aggregation zu implementieren, müssen Sie einen Fehler vom Compiler im zweiten Teil des Codes erhalten, der als Abstract deklariert werden muss oder eine abstrakte Methode implementieren muss. – user2100493

+0

Es kompiliert und läuft erfolgreich für mich - haben Sie die "Aufruf" -Methodensignatur geändert? Stellen Sie sicher, dass es Super-Eingabe entspricht und Rückgabetypen sollten mit der Typdefinition übereinstimmen. –