2015-04-03 23 views
7

Ich habe einen wikipedia Dump mit bzip2 komprimiert (heruntergeladen von http://dumps.wikimedia.org/enwiki/), aber ich möchte es nicht entpacken: Ich möchte es während der Dekomprimierung im laufenden Betrieb verarbeiten.BZip2 komprimierter Eingang für Apache Flink

Ich weiß, dass es möglich ist, es in einfachem Java zu tun (siehe z. B. Java - Read BZ2 file and uncompress/parse on the fly), aber ich frage mich, wie es in Apache Flink tut? Was ich wahrscheinlich brauche, ist so etwas wie https://github.com/whym/wikihadoop, aber für Flink, nicht Hadoop.

Antwort

5

Es ist möglich, komprimierte Dateien in den folgenden Formaten in Apache Flink zu lesen:

org.apache.hadoop.io.compress.BZip2Codec 
org.apache.hadoop.io.compress.DefaultCodec 
org.apache.hadoop.io.compress.DeflateCodec 
org.apache.hadoop.io.compress.GzipCodec 
org.apache.hadoop.io.compress.Lz4Codec 
org.apache.hadoop.io.compress.SnappyCodec 

Wie Sie aus den Paketnamen sehen können, tut Flink diese InputFormats des Hadoop verwenden. Dies ist ein Beispiel für das Lesen gz Dateien Flink Scala API: (Sie müssen mindestens Flink 0.8.1)

def main(args: Array[String]) { 

    val env = ExecutionEnvironment.getExecutionEnvironment 
    val job = new JobConf() 
    val hadoopInput = new TextInputFormat() 
    FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz")) 
    val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job) 

    lines.print 

    env.execute("Read gz files") 
} 

Apache Flink hat nur build-in-Unterstützung für .deflate Dateien. Das Hinzufügen von Unterstützung für weitere Komprimierungscodecs ist einfach, wurde aber noch nicht durchgeführt.

Die Verwendung von HadoopInputFormats mit Flink verursacht keinen Leistungsverlust. Flink hat eingebaute Serialisierungsunterstützung für Hadoop Writable Typen.