2016-08-03 11 views
1

Ich versuche derzeit einige Algorithmen in Apache Spark und Apache Flink zu implementieren. Bei der Ausführung der Algorithmen muss ich eine Art von Differenz-/Subtraktionsoperationen durchführen.Apache Flink DataSet Differenz-/Subtraktionsoperation

Während es eine eingebaute subtract Operation für Apache Spark gibt, konnte ich etwas ähnliches in Apache Flink (1.0.3 und 1.1.0-SNAPSHOT) nicht finden.

Also meine Frage ist, da zwei Datensatzobjekte d1, d2 beide die gleiche Art enthält T, was ist der effizienteste Weg gesetzt Differenz anwenden, das heißt d1\d2?

val d1: DataSet[T] = ... 
val d2: DataSet[T] = ... 
val d_diff: DataSet[T] = ??? 

Wahrscheinlich ist es eine Möglichkeit, um es über coGroup

val d_diff = d1.coGroup(d2).where(0).equalTo(0) { 
       (l, r, out: Collector[T]) => { 
       val rightElements = r.toSet 
       for (el <- l) 
        if (!rightElements.contains(el)) out.collect(el) 
       } 
      } 

aber ich frage mich, ob das stimmt und Best-Practice oder nicht weiß jemand etwas effizienter?

Antwort

2

Die DataSet-API bietet keine Methoden dafür, da sie nur die grundlegenden Operationen enthält. Die Tabellen-API in 1.1 hat einen gesetzten Minus-Operator. Sie können sehen, wie es implementiert ist here.

leftDataSet 
    .coGroup(rightDataSet) 
    .where("*") 
    .equalTo("*") 
    .`with`(coGroupFunction) 

Mit this CoGroupFunction. Also ja, du bist auf dem richtigen Weg.

Verwandte Themen