2017-03-09 2 views
0

Ich baue derzeit ein Benchmark-Tool für einige verteilte Verarbeitungstools und habe Probleme mit Apache Flink.Reduzieren Sie auf Pojo-Feld mit Apache Flink mit Java

Die Einrichtung ist einfach: LogPojo ist ein einfaches Pojo mit drei Feldern (langes Datum, doppelter Wert, String-Daten). Aus einer Liste suche ich nach dem einen LogPojo mit dem minimalen "Wert" -Feld. Grundsätzlich ist das Äquivalent zu:

pojoList.stream().min(new LogPojo.Comp()).get().getValue(); 

Mein flink Setup wie folgt aussieht:

public double processLogs(List<LogPojo> logs) { 

    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

    DataSet<LogPojo> logSet = env.fromCollection(logs); 

    double result = 0.0; 
    try { 
     ReduceOperator ro = logSet.reduce(new LogReducer()); 
     List<LogPojo> c = ro.collect(); 
     result = c.get(0).getValue(); 
    } catch (Exception ex) { 
     System.out.println("Exception caught" + ex); 
    } 

    return result; 
} 

public class LogReducer implements ReduceFunction<LogPojo> { 

    @Override 
    public LogPojo reduce(LogPojo o1, LogPojo o2) { 
     return (o1.getValue() < o2.getValue()) ? o1 : o2; 
    } 
} 

Es hört mit:

Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet; 

Also irgendwie scheint es nicht in der Lage sein, um die Funktion reduzieren gilt. Ich kann einfach nicht finden, warum. Irgendwelche Hinweise?

Antwort

1

Zuerst sollten Sie Ihre Importe überprüfen. Sie erhalten eine Ausnahme von einer Scala-Klasse, aber Ihr Programm ist in Java implementiert. Möglicherweise haben Sie die Scala DataSet-API versehentlich importiert. Die Verwendung der Java-API sollte nicht zu einer Scala-Ausnahme führen (es sei denn, Sie verwenden Klassen, die von Scala abhängen).

Unabhängig davon Flink verfügt über ein integriertes in Aggregationsverfahren für min, max usw.

DataSet<LogPojo> logSet = env.fromCollection(logs); 
// map LogPojo to a Tuple1<Double> 
// (Flink's built-in aggregation functions work only on Tuple types) 
DataSet<Tuple1<Double>> values = logSet.map(new MapFunction<LogPojo, Tuple1<Double>>() { 
    @Override 
    public Tuple1<Double> map(LogPojo l) throws Exception { 
     return new Tuple1<>(l.value); 
    } 
    }); 
// fetch the min value (at position 0 in the Tuple) 
List<Tuple1<Double>> c = values.min(0).collect(); 
// get the first field of the Tuple 
Double minVal = c.get(0).f0; 
+0

Danke, der Hinweis über den Import löste es. Es scheint, dass eine der Funkenskala-Abhängigkeiten Verwüstung angerichtet hat. Sobald ich das Funkenmodul deaktiviert habe, funktioniert es. –