2016-09-09 4 views
1

Ich versuche mit flink eine CSV-Datei als Parkett zu schreiben. Ich verwende den folgenden Code und erhalte den Fehler.Flink in Parkett umwandeln Fehler

val parquetFormat = new HadoopOutputFormat[Void, String](new AvroParquetOutputFormat, job) 
FileOutputFormat.setOutputPath(job, new Path(outputPath)) 

Ich bekomme den folgenden Build-Fehler. Kann mir bitte jemand helfen?

Typ nicht übereinstimmen; gefunden: parket.avro.AvroParquetOutputFormat erforderlich: org.apache.hadoop.mapreduce.OutputFormat [Leere, String] ingestion.scala/flink-scala/src/main/scala/com/sc/edl/flink linie 75 Scala Problem

Antwort

1

Sie möchten eine HadoopOutputFormat[Void, String] erstellen, die eine OutputFormat[Void, String] erfordert.

Sie bieten eine AvroParquetOutputFormat, die ParquetOutputFormat<IndexedRecord> erweitert. ParquetOutputFormat ist definiert als ParquetOutputFormat<T> extends FileOutputFormat<Void, T>.

So stellen Sie eine OutputFormat[Void, IndexedRecord] während HadoopOutputFormat[Void, String] erwartet eine OutputFormat[Void, String].

sollten Sie parquetFormat ändern

zu
val parquetFormat = new HadoopOutputFormat[Void, IndexedRecord](
    new AvroParquetOutputFormat, job) 
FileOutputFormat.setOutputPath(job, new Path(outputPath)) 

Wenn die DataSet, die Sie nicht vom Typ schreiben wollen, ist (Void, IndexedRecord) Sie eine MapFunction hinzufügen sollten, dass Ihre Daten in (Void, IndexedRecord) Paare umwandelt.

+0

Danke Fabian, tut mir leid, aber ich bin neu mit diesem, können Sie bitte die richtige Syntax oder was falsch ist – Niki

+0

Ich erweiterte meine Antwort –

1

Dennoch bleibt das Problem bestehen, da das Flink Tupel ab sofort keine NULL Keys unterstützt. folgende Fehler auftreten: Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but expected to hold a value.

Die bessere Wahl KiteSDK zu verwenden wäre wie in diesem Beispiel erläuterte: https://github.com/nezihyigitbasi/FlinkParquet Also, wenn Sie dynamisches Schema benötigen, dann wird dieser Ansatz nicht funktionieren, weil Sie das Schema einhalten müssen streng. Außerdem ist dies besser zum Lesen als zum Schreiben.

Spark DataFrame funktioniert sehr gut mit Parkett nicht nur in Bezug auf API, sondern auch in Bezug auf die Leistung. Aber wenn man Flink benutzen will, muss man entweder darauf warten, dass die flink-Community die API freigibt oder den Parkett-Hadoop-Code selbst bearbeitet, was eine große Anstrengung sein könnte.

Nur diese Anschlüsse sind implementiert noch https://github.com/apache/flink/tree/master/flink-connectors Also, mein persönlicher Vorschlag wäre, wenn Sie Funke so gehen für sie verwenden können, hat es reifere api Produktion Fällen Gebrauch berücksichtigen. Wie Sie mit dem Grundbedürfnis mit flink stecken geblieben sind, könnten Sie auch woanders feststecken.

Verschwenden Sie keine Zeit, um sich mit Flink abzumühen. Ich habe viel Zeit verschwendet, um mit Standardoptionen wie Hive, Spark oder MR zu arbeiten.