Es ist möglich, komprimierte Dateien in den folgenden Formaten in Apache Flink zu lesen:
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
Wie Sie aus den Paketnamen sehen können, tut Flink diese InputFormats des Hadoop verwenden. Dies ist ein Beispiel für das Lesen gz Dateien Flink Scala API: (Sie müssen mindestens Flink 0.8.1)
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val job = new JobConf()
val hadoopInput = new TextInputFormat()
FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)
lines.print
env.execute("Read gz files")
}
Apache Flink hat nur build-in-Unterstützung für .deflate Dateien. Das Hinzufügen von Unterstützung für weitere Komprimierungscodecs ist einfach, wurde aber noch nicht durchgeführt.
Die Verwendung von HadoopInputFormats mit Flink verursacht keinen Leistungsverlust. Flink hat eingebaute Serialisierungsunterstützung für Hadoop Writable
Typen.