Flink Version zurückgibt: 1.3.1Apache Flink: LEFT JOIN mit einem TableFunction nicht erwartetes Ergebnis
ich zwei Tabellen erstellt, ein aus dem Speicher, ein anderes ist von UDTF. Als ich Join und Left Join getestet habe, haben sie dasselbe Ergebnis zurückgegeben. Was ich erwartet hatte, war übrig, hatte mehr Reihen als beitreten.
Mein Testcode ist dies:
public class ExerciseUDF {
public static void main(String[] args) throws Exception {
test_3();
}
public static void test_3() throws Exception {
// 1. set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
new WC("Hello", 1));
// 2. register the DataSet as table "WordCount"
tEnv.registerDataSet("WordCount", input, "word, frequency");
Table table;
DataSet<WC> result;
DataSet<WCUpper> resultUpper;
table = tEnv.scan("WordCount");
// 3. table left join user defined table
System.out.println("table left join user defined table");
tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, S.word as myupper FROM WordCount as S left join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word");
resultUpper = tEnv.toDataSet(table, WCUpper.class);
resultUpper.print(); // out put —— WCUpper Ciao 1 CIAO, however, without the row having Hello
// 4. table join user defined table
System.out.println("table join user defined table");
tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
table = tEnv.scan("WordCount");
table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, T.myupper as myupper FROM WordCount as S join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word"
);
resultUpper = tEnv.toDataSet(table, WCUpper.class);
resultUpper.print();
}
public static class WC {
public String word;
public long frequency;
// public constructor to make it a Flink POJO
public WC() {
}
public WC(String word, long frequency) {
this.word = word;
this.frequency = frequency;
}
@Override
public String toString() {
return "WC " + word + " " + frequency;
}
}
// user defined table function
public static class MyTableFunc_2 extends TableFunction<Tuple2<String,String>>{
public void eval(String str){ // hello --> hello HELLO
System.out.println("upper func executed for "+str);
if(str.equals("Hello")){
return;
}
collect(new Tuple2<String,String>(str,str.toUpperCase()));
// collect(new Tuple2<String,String>(str,str.toUpperCase()));
}
}
}
Der Ausgang des linken verbinden und Join-Abfragen die gleichen sind. In beiden Fällen wird nur eine Zeile zurückgegeben.
WCUpper Ciao 1 CIAO
Aber ich denke, dass die linke Abfrage Reihen der 'Hallo' bewahren beitreten sollte.
Ich möchte wissen, welche Zeilen übereinstimmen und nicht übereinstimmen zwischen 'S' und' T'. Solch ein Ergebnis scheint nicht erreicht zu werden, indem einfach "ON TRUE" -Bedingung verwendet wird. Gibt es eine Möglichkeit, dies zu tun? Außerdem verhält sich mysql in join ... on ... anders, eine ''Hello''-Zeile wird trotzdem zurückgegeben, obwohl Join fehlgeschlagen ist. In diesem Zustand können einige Spalten' null 'sein https: //dev.mysql. com/doc/refman/5.7/de/left-join-optimization.html _Wenn es eine Zeile in A gibt, die der WHERE-Klausel entspricht, aber keine Zeile in B vorhanden ist, die der ON-Bedingung entspricht, wird eine zusätzliche B-Zeile mit generiert Alle Spalten sind auf NULL gesetzt. –
Danke! Ich habe das nochmal angeschaut und du hast recht. TableFunction-Outer-Joins mit Prädikaten sind unterbrochen. Ich habe ein JIRA-Problem erstellt (https://issues.apache.org/jira/browse/FLINK-7730) und werde meine Antwort aktualisieren. Im Allgemeinen ist es eine gute Idee, direkt an die Flink Community zu gelangen, wenn Sie glauben, dass Sie einen Fehler gefunden haben. Solche Berichte gehen bei StackOverflow leicht verloren. Vielen Dank! –