2016-05-09 8 views
2

Ich bin Union-Operator auf zwei DataStream s Generische Datensatztyp anwenden.Apache Flink Union Operator geben falsche Antwort

package com.gslab.com.dataSets; 
import java.io.File; 
import java.util.ArrayList; 
import java.util.List; 
import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericData; 
import org.apache.avro.generic.GenericData.Record; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.flink.api.common.functions.MapFunction; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 

public class FlinkBroadcast { 
    public static void main(String[] args) throws Exception { 

     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setParallelism(2); 

     List<String> controlMessageList = new ArrayList<String>(); 
     controlMessageList.add("controlMessage1"); 
     controlMessageList.add("controlMessage2"); 

     List<String> dataMessageList = new ArrayList<String>(); 
     dataMessageList.add("Person1"); 
     dataMessageList.add("Person2"); 
     dataMessageList.add("Person3"); 
     dataMessageList.add("Person4"); 

     DataStream<String> controlMessageStream = env.fromCollection(controlMessageList); 
     DataStream<String> dataMessageStream = env.fromCollection(dataMessageList); 

     DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() { 
      @Override 
      public GenericRecord map(String value) throws Exception { 
       Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc"))); 
       gr.put("TYPE", value); 
       return gr; 
      } 
     }); 

     DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() { 
      @Override 
      public GenericRecord map(String value) throws Exception { 
       Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc"))); 
       gr.put("FIRSTNAME", value); 
       gr.put("LASTNAME", value+": lastname"); 
       return gr; 
      } 
     }); 

     //Displaying Generic records 
     dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() { 
      @Override 
      public GenericRecord map(GenericRecord value) throws Exception { 
       System.out.println("data before union: "+ value); 
       return value; 
      } 
     }); 

     controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() { 
      @Override 
      public GenericRecord map(GenericRecord value) throws Exception { 
       System.out.println("data after union: " + value); 
       return value; 
      } 
     }); 
     env.execute("stream"); 
    } 
} 

Ausgang:

05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
data after union: {"TYPE": "controlMessage1"} 
data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"} 
data after union: {"TYPE": "controlMessage1"} 
data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"} 
data after union: {"TYPE": "controlMessage2"} 
data after union: {"TYPE": "controlMessage2"} 
data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"} 
data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"} 
data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"} 
data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"} 
data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"} 
05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED 
data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"} 
05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
05/09/2016 13:02:13 Job execution switched to status FINISHED. 

Wie Sie Datensätze in dataMessageGenericRecordStream sehen, die nach Vereinigung nicht korrekt sind. Alle Feldwerte werden durch den ersten Feldwert ersetzt.

+0

Ich auch auf Ihre andere Frage gepostet. Könnten Sie bitte die 'TypeInformation' für jeden DataStream ausdrucken? Sie können das mit 'DataStream.getType()', d. H. 'System.out.println (dataMessageGenericRecordStream.getType())' erreichen. – aljoscha

+0

Druck dataMessageGenericRecordStream.getType(): GenericType Druck controlMessageGenericRecordStream.getType(): GenericType

+0

Dies ist reproduzierbar nur für GenericRecord, wenn i chang es zu Karte seine Arbeit. Können Sie mir ein Workarround vorschlagen? –

Antwort

1

Ich war in der DataSet-API mit einem ähnlichen Problem konfrontiert. Ich habe einige Avro-Dateien als GenericRecords gelesen und dieses seltsame Verhalten gesehen. Ich habe diese Problemumgehung verwendet - anstatt sie als GenericRecords zu lesen, lese ich sie als spezifische Datensätze (z. B. MyAvroObject) und verwende dann eine Map, um sie als GenericRecords zu konvertieren/typisieren.

schrieb ich einige Code Ihren Anwendungsfall mit DataSet-API zu testen, und es funktioniert mit dem oben workaround-

public static void maintest(String[] args) throws Exception { 
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
    env.setParallelism(2); 

    List<String> queryList1 = new ArrayList<String>(); 
    queryList1.add("query1"); 
    queryList1.add("query2"); 

    List<String> queryList2 = new ArrayList<String>(); 
    queryList2.add("QUERY1"); 
    queryList2.add("QUERY2"); 
    queryList2.add("QUERY3"); 
    queryList2.add("QUERY4"); 

    DataSet<String> dataset1 = env.fromCollection(queryList1); 
    DataSet<String> dataset2 = env.fromCollection(queryList2); 

    DataSet<GenericRecord> genericDS1 = dataset1.map(new MapFunction<String, GenericRecord>() { 
     @Override 
     public GenericRecord map(String value) throws Exception { 
      Query query = Query.newBuilder().setQuery(value).build(); 
      return (GenericRecord) query; 
     } 
    }); 

    DataSet<GenericRecord> genericDS2 = dataset2.map(new MapFunction<String, GenericRecord>() { 
     @Override 
     public GenericRecord map(String value) throws Exception { 
      SearchEngineQuery searchEngineQuery = SearchEngineQuery.newBuilder().setSeQuery(value).build(); 
      return (GenericRecord) searchEngineQuery; 
     } 
    }); 

    genericDS2.map(new MapFunction<GenericRecord, GenericRecord>() { 
     @Override 
     public GenericRecord map(GenericRecord value) throws Exception { 
      System.out.println("DEBUG: data before union: " + value); 
      return value; 
     } 
    }); 

    genericDS1.union(genericDS2).map(new MapFunction<GenericRecord, GenericRecord>() { 
     @Override 
     public GenericRecord map(GenericRecord value) throws Exception { 
      System.out.println("DEBUG: data after union: " + value); 
      return value; 
     } 
    }).print(); 
} 

Wo Query und Searchengine Abfrage meine Avro Objekte (ähnlich Ihrer Steuernachrichtenliste und Datennachricht sind Liste).

Ausgang:

{"query": "query1"} 
{"se_query": "QUERY1"} 
{"se_query": "QUERY3"} 
{"query": "query2"} 
{"se_query": "QUERY2"} 
{"se_query": "QUERY4"} 
2

ich ein paar Tage zu untersuchen dies für ein anderes Thema verbracht haben (aber immer noch GenericRecord beteiligt) und haben die Ursache und die Lösung gefunden.

Root Cause: Innerhalb von Apache Avro „Schema.class“ das „Feld“ Position ein vergänglich ist und nicht durch Kryo serialisiert erhält, und daher wird als Position „0“ initialisiert, wenn innerhalb der Flink Pipeline deserialisiert.

Siehe die JIRA AVRO-1476, die dies beschreibt und speziell die Kyro-Serialisierung erwähnt.

Dies wurde in Avro 1.7.7

Lösung behoben: Flink müssen Avro 1.7.7 (oder später). Ich habe das Update in meinem lokalen Computer überprüft, indem ich die Avro-Klassen innerhalb von flink-dist_2.11-1.1.3.jar ersetzt habe und mein Problem wurde behoben. https://issues.apache.org/jira/browse/FLINK-5039 für diese

Es gibt eine PR jetzt: https://github.com/apache/flink/pull/2953

Und ich erwarte, dass wird es in der Flink baut 1.1.4 und 1.2.0 enthalten sein

ich die JIRA Problem für diese aktualisiert.