2017-09-27 2 views
0

Flink Version zurückgibt: 1.3.1Apache Flink: LEFT JOIN mit einem TableFunction nicht erwartetes Ergebnis

ich zwei Tabellen erstellt, ein aus dem Speicher, ein anderes ist von UDTF. Als ich Join und Left Join getestet habe, haben sie dasselbe Ergebnis zurückgegeben. Was ich erwartet hatte, war übrig, hatte mehr Reihen als beitreten.

Mein Testcode ist dies:

public class ExerciseUDF { 
     public static void main(String[] args) throws Exception { 
      test_3(); 
     } 
     public static void test_3() throws Exception { 
       // 1. set up execution environment 
       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
       BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); 

       DataSet<WC> input = env.fromElements(
         new WC("Hello", 1), 
         new WC("Ciao", 1), 
         new WC("Hello", 1)); 

       // 2. register the DataSet as table "WordCount" 
       tEnv.registerDataSet("WordCount", input, "word, frequency"); 

       Table table; 
       DataSet<WC> result; 
         DataSet<WCUpper> resultUpper; 
       table = tEnv.scan("WordCount"); 
       // 3. table left join user defined table 
       System.out.println("table left join user defined table"); 
       tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2()); 
       table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, S.word as myupper FROM WordCount as S left join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word"); 
       resultUpper = tEnv.toDataSet(table, WCUpper.class); 
       resultUpper.print(); // out put —— WCUpper Ciao 1 CIAO, however, without the row having Hello 

       // 4. table join user defined table 
       System.out.println("table join user defined table"); 
       tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2()); 
       table = tEnv.scan("WordCount"); 
       table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, T.myupper as myupper FROM WordCount as S join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word" 
       ); 
       resultUpper = tEnv.toDataSet(table, WCUpper.class); 
       resultUpper.print(); 
      } 

      public static class WC { 
       public String word; 
       public long frequency; 

       // public constructor to make it a Flink POJO 
       public WC() { 
       } 

       public WC(String word, long frequency) { 
        this.word = word; 
        this.frequency = frequency; 
       } 

       @Override 
       public String toString() { 
        return "WC " + word + " " + frequency; 
       } 
      } 


      // user defined table function 
      public static class MyTableFunc_2 extends TableFunction<Tuple2<String,String>>{ 
       public void eval(String str){ // hello --> hello HELLO 
        System.out.println("upper func executed for "+str); 
        if(str.equals("Hello")){ 
         return; 
        } 
        collect(new Tuple2<String,String>(str,str.toUpperCase())); 
        // collect(new Tuple2<String,String>(str,str.toUpperCase())); 
       } 
      } 
    } 

Der Ausgang des linken verbinden und Join-Abfragen die gleichen sind. In beiden Fällen wird nur eine Zeile zurückgegeben.

WCUpper Ciao 1 CIAO

Aber ich denke, dass die linke Abfrage Reihen der 'Hallo' bewahren beitreten sollte.

Antwort

0

Ja, Sie haben Recht.

Dies ist ein Fehler in der Übersetzung von TableFunction Outer Joins mit Prädikaten und muss behoben werden.

Danke, Fabian

+0

Ich möchte wissen, welche Zeilen übereinstimmen und nicht übereinstimmen zwischen 'S' und' T'. Solch ein Ergebnis scheint nicht erreicht zu werden, indem einfach "ON TRUE" -Bedingung verwendet wird. Gibt es eine Möglichkeit, dies zu tun? Außerdem verhält sich mysql in join ... on ... anders, eine ''Hello''-Zeile wird trotzdem zurückgegeben, obwohl Join fehlgeschlagen ist. In diesem Zustand können einige Spalten' null 'sein https: //dev.mysql. com/doc/refman/5.7/de/left-join-optimization.html _Wenn es eine Zeile in A gibt, die der WHERE-Klausel entspricht, aber keine Zeile in B vorhanden ist, die der ON-Bedingung entspricht, wird eine zusätzliche B-Zeile mit generiert Alle Spalten sind auf NULL gesetzt. –

+0

Danke! Ich habe das nochmal angeschaut und du hast recht. TableFunction-Outer-Joins mit Prädikaten sind unterbrochen. Ich habe ein JIRA-Problem erstellt (https://issues.apache.org/jira/browse/FLINK-7730) und werde meine Antwort aktualisieren. Im Allgemeinen ist es eine gute Idee, direkt an die Flink Community zu gelangen, wenn Sie glauben, dass Sie einen Fehler gefunden haben. Solche Berichte gehen bei StackOverflow leicht verloren. Vielen Dank! –

Verwandte Themen