Ich bin Union-Operator auf zwei DataStream
s Generische Datensatztyp anwenden.Apache Flink Union Operator geben falsche Antwort
package com.gslab.com.dataSets;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkBroadcast {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
List<String> controlMessageList = new ArrayList<String>();
controlMessageList.add("controlMessage1");
controlMessageList.add("controlMessage2");
List<String> dataMessageList = new ArrayList<String>();
dataMessageList.add("Person1");
dataMessageList.add("Person2");
dataMessageList.add("Person3");
dataMessageList.add("Person4");
DataStream<String> controlMessageStream = env.fromCollection(controlMessageList);
DataStream<String> dataMessageStream = env.fromCollection(dataMessageList);
DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() {
@Override
public GenericRecord map(String value) throws Exception {
Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc")));
gr.put("TYPE", value);
return gr;
}
});
DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() {
@Override
public GenericRecord map(String value) throws Exception {
Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc")));
gr.put("FIRSTNAME", value);
gr.put("LASTNAME", value+": lastname");
return gr;
}
});
//Displaying Generic records
dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() {
@Override
public GenericRecord map(GenericRecord value) throws Exception {
System.out.println("data before union: "+ value);
return value;
}
});
controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() {
@Override
public GenericRecord map(GenericRecord value) throws Exception {
System.out.println("data after union: " + value);
return value;
}
});
env.execute("stream");
}
}
Ausgang:
05/09/2016 13:02:13 Map(2/2) switched to FINISHED
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"}
data after union: {"TYPE": "controlMessage1"}
data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"TYPE": "controlMessage2"}
data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"}
data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"}
data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"}
data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"}
data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"}
05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED
data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"}
05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED
05/09/2016 13:02:13 Map(1/2) switched to FINISHED
05/09/2016 13:02:13 Map(2/2) switched to FINISHED
05/09/2016 13:02:13 Job execution switched to status FINISHED.
Wie Sie Datensätze in dataMessageGenericRecordStream sehen, die nach Vereinigung nicht korrekt sind. Alle Feldwerte werden durch den ersten Feldwert ersetzt.
Ich auch auf Ihre andere Frage gepostet. Könnten Sie bitte die 'TypeInformation' für jeden DataStream ausdrucken? Sie können das mit 'DataStream.getType()', d. H. 'System.out.println (dataMessageGenericRecordStream.getType())' erreichen. – aljoscha
Druck dataMessageGenericRecordStream.getType(): GenericType Druck controlMessageGenericRecordStream.getType(): GenericType –
Dies ist reproduzierbar nur für GenericRecord, wenn i chang es zu Karte seine Arbeit. Können Sie mir ein Workarround vorschlagen? –