Ich verwende Apache Flink DataSet API. Ich möchte einen Job implementieren, der mehrere Ergebnisse in verschiedene Dateien schreibt.Kann Flink Ergebnisse in mehrere Dateien schreiben (wie Hadoops MultipleOutputFormat)?
Wie kann ich das tun?
Ich verwende Apache Flink DataSet API. Ich möchte einen Job implementieren, der mehrere Ergebnisse in verschiedene Dateien schreibt.Kann Flink Ergebnisse in mehrere Dateien schreiben (wie Hadoops MultipleOutputFormat)?
Wie kann ich das tun?
Sie können so viele Datensenken zu einem DataSet
Programm hinzufügen, wie Sie benötigen.
Zum Beispiel in einem Programm wie folgt aus:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple3<String, Long, Long>> data = env.readFromCsv(...);
// apply MapFunction and emit
data.map(new YourMapper()).writeToText("/foo/bar");
// apply FilterFunction and emit
data.filter(new YourFilter()).writeToCsv("/foo/bar2");
Sie lesen einen DataSet
data
aus einer CSV-Datei. Diese data
auf zwei aufeinanderfolgenden Transformationen gegeben:
MapFunction
und das Ergebnis wird in eine Textdatei geschrieben.FilterFunction
und die nicht gefilterten Tupel werden in eine CSV-Datei geschrieben.Sie können auch mehrere Datenquellen und Zweig und Datensätze verschmelzen (mit union
, join
, coGroup
, cross
oder Broadcast-Sets), wie Sie möchten.
Sie HadoopOutputFormat
API in Flink wie diese verwenden:
class IteblogMultipleTextOutputFormat[K, V] extends MultipleTextOutputFormat[K, V] {
override def generateActualKey(key: K, value: V): K =
NullWritable.get().asInstanceOf[K]
override def generateFileNameForKeyValue(key: K, value: V, name: String): String =
key.asInstanceOf[String]
}
und wir können mit IteblogMultipleTextOutputFormat
wie folgt:
val multipleTextOutputFormat = new IteblogMultipleTextOutputFormat[String, String]()
val jc = new JobConf()
FileOutputFormat.setOutputPath(jc, new Path("hdfs:///user/iteblog/"))
val format = new HadoopOutputFormat[String, String](multipleTextOutputFormat, jc)
val batch = env.fromCollection(List(("A", "1"), ("A", "2"), ("A", "3"),
("B", "1"), ("B", "2"), ("C", "1"), ("D", "2")))
batch.output(format)
für weitere Informationen können Sie sehen: http://www.iteblog.com/archives/1667