2017-06-23 5 views
1

Ich habe eine abstrakte Klasse, deren abstrakte Methode eine SourceFunction erstellt, so abgeleitete Klassen können einfache oder komplexere Quellen zurückgeben (z. B. KafkaConsumers usw.). ChangeMe ist eine einfache automatisch generierte Klasse, die durch die Erstellung eines AvroSchema erstellt wurde.Typ löschen & Flink: Was verursacht Laufzeitfehler?

public SourceFunction<ChangeMe> createSourceFunction(ParameterTool params) { 
     FromElementsFunction<ChangeMe> dataSource = null; 

     List<ChangeMe> changeMeList = Arrays.asList(
       ChangeMe.newBuilder().setSomeField("Some field 1").build(), 
       ChangeMe.newBuilder().setSomeField("Some field 2").build(), 
       ChangeMe.newBuilder().setSomeField("Some field 3").build() 
     ); 
     try { 
      dataSource = new FromElementsFunction<>(new AvroSerializer<>(ChangeMe.class), changeMeList); 
     } 
     catch (IOException ex){ 

     } 

     return dataSource; 
} 

In meinem Flink Job diesen Ich habe im Grunde:

SourceFunction<ChangeMe> source = createSourceFunction(params); 
DataStream<T> sourceDataStream = streamExecutionEnvironment.addSource(source); 


DataStream<ChangeMe> changeMeEventsStream = this.getSourceDataStream(); // gets sourceDataStream above 
changeMeEventsStream.print(); 

Als ich den Auftrag ausführen, bekomme ich diesen Fehler in Bezug auf den Anruf() drucken:

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'Custom Source' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface. 
…… 
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'T' in 'class org.apache.flink.streaming.api.functions.source.FromElementsFunction' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 

Ich benutze den Eclipse-Compiler, also hatte ich gedacht, dass die Typinformation enthalten sein würde (obwohl ich dachte, dass dies nur für Lambdas war, und es gibt keine im obigen). Was muss ich tun, damit dies korrekt ausgeführt wird?

+0

Es kann nicht sein, so etwas wie eine „Funktion‚Custom Source‘“, weil Funktion (Methode) Namen Leerzeichen enthalten nicht. –

Antwort

1

Wenn Sie möchten, um direkt eine FromElementsFunction instanziiert, dann müssen Sie manuell eine TypeInformation Instanz für die ChangeMe Klasse liefern, wenn addSource aufrufen. Dies ist notwendig, damit Flink etwas über den Elementtyp erfährt.

Der folgende Codeausschnitt sollte es tun:

SourceFunction<ChangeMe> source = createSourceFunction(); 

TypeInformation<ChangeMe> typeInfo = TypeInformation.of(ChangeMe.class); 
DataStream<ChangeMe> sourceDataStream = env.addSource(source, typeInfo); 

DataStream<ChangeMe> changeMeEventsStream = sourceDataStream; 
changeMeEventsStream.print(); 
Verwandte Themen