Ich versuche, Fallklassen in AVRO nach HDFS mit RollingSink in Flink zu schreiben. Um AVRO-Dateien durch HDFS deserialisierbar zu machen, verwende ich DataFileWriter, der FSDataOutputStream umschließt. Wenn ich versuche, zwischen DataFileWriter und FSDataOutputStream zu synchronisieren, um die Datendatei auf HDFS zu schließen, wird die Ausnahme ausgelöst und ich erhalte tatsächlich Daten in jeder anderen Datei. Gibt es eine Möglichkeit, fs-Stream mit Avro Writer in Flink Writer-Implementierung zu synchronisieren?Schreiben von Objekten, die mit Avro auf HDFS serialisiert wurden mit RollingSink in Flink [Scala]
Ich habe versucht mit DataFileWriter schließen() flush() sync() fsync(), aber alle fehlgeschlagen. Die Synchronisierungsmethode scheint am besten zu funktionieren. Ich habe auch versucht, in Write-Methode zu synchronisieren, die zu funktionieren schien, aber immer noch einen Fehler erzeugt und ich konnte nicht überprüfen, ob alle Daten in Dateien gespeichert sind.
class AvroWriter[OutputContainer <: org.apache.avro.specific.SpecificRecordBase] extends Writer[OutputContainer] {
val serialVersionUID = 1L
var outputStream: FSDataOutputStream = null
var outputWriter: DataFileWriter[OutputContainer] = null
override def open(outStream: FSDataOutputStream): Unit = {
if (outputStream != null) {
throw new IllegalStateException("AvroWriter has already been opened.")
}
outputStream = outStream
if(outputWriter == null) {
val writer: DatumWriter[OutputContainer] = new SpecificDatumWriter[OutputContainer](OutputContainer.SCHEMA$)
outputWriter = new DataFileWriter[OutputContainer](writer)
outputWriter.create(OutputContainer.SCHEMA$, outStream)
}
}
override def flush(): Unit = {}
override def close(): Unit = {
if(outputWriter != null) {
outputWriter.sync()
}
outputStream = null
}
override def write(element: OutputContainer) = {
if (outputStream == null) {
throw new IllegalStateException("AvroWriter has not been opened.")
}
outputWriter.append(element)
}
override def duplicate(): AvroWriter[OutputContainer] = {
new AvroWriter[OutputContainer]
}
}
Versuch RollingSink mit dem obigen Code gibt folgende Ausnahme auszuführen:
java.lang.Exception: Could not forward element to next operator
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158)
at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:664)
Caused by: java.nio.channels.ClosedChannelException
at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1353)
at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:98)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121)
at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:366)
at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:383)
at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:401)
at pl.neptis.FlinkKafkaConsumer.utils.AvroWriter.close(AvroWriter.scala:36)
at org.apache.flink.streaming.connectors.fs.RollingSink.closeCurrentPartFile(RollingSink.java:476)
at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:419)
at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:373)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
... 3 more