Ich baue derzeit ein Benchmark-Tool für einige verteilte Verarbeitungstools und habe Probleme mit Apache Flink.Reduzieren Sie auf Pojo-Feld mit Apache Flink mit Java
Die Einrichtung ist einfach: LogPojo ist ein einfaches Pojo mit drei Feldern (langes Datum, doppelter Wert, String-Daten). Aus einer Liste suche ich nach dem einen LogPojo mit dem minimalen "Wert" -Feld. Grundsätzlich ist das Äquivalent zu:
pojoList.stream().min(new LogPojo.Comp()).get().getValue();
Mein flink Setup wie folgt aussieht:
public double processLogs(List<LogPojo> logs) {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<LogPojo> logSet = env.fromCollection(logs);
double result = 0.0;
try {
ReduceOperator ro = logSet.reduce(new LogReducer());
List<LogPojo> c = ro.collect();
result = c.get(0).getValue();
} catch (Exception ex) {
System.out.println("Exception caught" + ex);
}
return result;
}
public class LogReducer implements ReduceFunction<LogPojo> {
@Override
public LogPojo reduce(LogPojo o1, LogPojo o2) {
return (o1.getValue() < o2.getValue()) ? o1 : o2;
}
}
Es hört mit:
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashSet;
Also irgendwie scheint es nicht in der Lage sein, um die Funktion reduzieren gilt. Ich kann einfach nicht finden, warum. Irgendwelche Hinweise?
Danke, der Hinweis über den Import löste es. Es scheint, dass eine der Funkenskala-Abhängigkeiten Verwüstung angerichtet hat. Sobald ich das Funkenmodul deaktiviert habe, funktioniert es. –