2017-10-31 4 views
1

Ich versuche, die folgende einfache Abfrage in Flink Dataset API zu implementieren.Apache Flink: NullPointerException in DataSet-API Outer Join

select 
    t1_value1 
from 
    table1 
where 
    t1_suppkey not in ( 
     select 
      t2_suppkey 
     from 
      table2 
    ) 

So war meine Idee, ein Left Outer Join (table1.leftOuterJoin (Tabelle 2) ...) durchzuführen und dann alle Zeilen zu löschen, wo ich einen Wert für t1_suppkey und t2_suppkey erhalten. So

Ich habe versucht, es wie folgt aus:

 output = table1 
    .leftOuterJoin(table2).where("t1_suppkey").equalTo("t2_suppkey") 
    .with((Table1 t1, Table2 t2) -> new Tuple2<>(t1.ps_suppkey, t2.s_suppkey)) 
    .returns(new TypeHint <Tuple2<Integer, Integer>>() {}); 

Allerdings, wenn ich es so tun es nicht immer mit „java.lang.NullPointerException“ und ich bin mir nicht sicher, warum. Wenn ich einen normalen Join anstelle eines Left Outer Join verwende, funktioniert der Code, aber das ist nicht das, was ich möchte.

Muss ich eine Left Join anders implementieren oder gibt es eine einfachere Möglichkeit, die "not in" -Anweisung in der Dataset-API neu zu schreiben?

Antwort

0

Der äußere Join der DataSet-API ruft die JoinFunction auch für äußere Datensätze auf, die auf der inneren Seite keinen Beitrittsdatensatz finden. In diesem Fall the JoinFunction.join() method is called with null.

Da Sie einen LEFT OUTER JOIN verwenden, kann das zweite Argument Table2 t2null sein. Die NullPointerException wird verursacht durch t2.s_suppkey. Sie müssen nach t2 == null suchen und nur auf t2 zugreifen, wenn es nicht null ist.

können Sie implementieren auch die NICHT mit einem FlatJoinFunction beitreten, die ein Collector Argument hat und nur t1 wenn t2 == null emittieren.

Eine weitere Option ist die Verwendung von Flink's Batch SQL support, die die Abfrage in Ihrem Beispiel unterstützt.

+0

Danke, löste es wie [dies] (https://Stackoverflow.com/a/47074758/8180276) und es funktioniert. – Rhyzx

0
output = table1 
.leftOuterJoin(table2) 
.where("t1_suppkey").equalTo("t2_suppkey") 
.with((Table1 t1, Table2 t2, Collector<Tuple2<Integer, Integer>> c) -> { 
if(t2 == null) { 
    c.collect(new Tuple2<>(t1.t1_suppkey, t1.t1_value1)); 
} 
else { 
    //Do nothing. 
}})