2016-03-24 4 views
0

Ich versuche, Fallklassen in AVRO nach HDFS mit RollingSink in Flink zu schreiben. Um AVRO-Dateien durch HDFS deserialisierbar zu machen, verwende ich DataFileWriter, der FSDataOutputStream umschließt. Wenn ich versuche, zwischen DataFileWriter und FSDataOutputStream zu synchronisieren, um die Datendatei auf HDFS zu schließen, wird die Ausnahme ausgelöst und ich erhalte tatsächlich Daten in jeder anderen Datei. Gibt es eine Möglichkeit, fs-Stream mit Avro Writer in Flink Writer-Implementierung zu synchronisieren?Schreiben von Objekten, die mit Avro auf HDFS serialisiert wurden mit RollingSink in Flink [Scala]

Ich habe versucht mit DataFileWriter schließen() flush() sync() fsync(), aber alle fehlgeschlagen. Die Synchronisierungsmethode scheint am besten zu funktionieren. Ich habe auch versucht, in Write-Methode zu synchronisieren, die zu funktionieren schien, aber immer noch einen Fehler erzeugt und ich konnte nicht überprüfen, ob alle Daten in Dateien gespeichert sind.

class AvroWriter[OutputContainer <: org.apache.avro.specific.SpecificRecordBase] extends Writer[OutputContainer] { 

    val serialVersionUID = 1L 

    var outputStream: FSDataOutputStream = null 
    var outputWriter: DataFileWriter[OutputContainer] = null 

    override def open(outStream: FSDataOutputStream): Unit = { 
    if (outputStream != null) { 
     throw new IllegalStateException("AvroWriter has already been opened.") 
    } 
    outputStream = outStream 

    if(outputWriter == null) { 
     val writer: DatumWriter[OutputContainer] = new SpecificDatumWriter[OutputContainer](OutputContainer.SCHEMA$) 
     outputWriter = new DataFileWriter[OutputContainer](writer) 
     outputWriter.create(OutputContainer.SCHEMA$, outStream) 
    } 
    } 

    override def flush(): Unit = {} 

    override def close(): Unit = { 
    if(outputWriter != null) { 
     outputWriter.sync() 
    } 
    outputStream = null 
    } 

    override def write(element: OutputContainer) = { 
    if (outputStream == null) { 
     throw new IllegalStateException("AvroWriter has not been opened.") 
    } 
    outputWriter.append(element) 
    } 

    override def duplicate(): AvroWriter[OutputContainer] = { 
    new AvroWriter[OutputContainer] 
    } 
} 

Versuch RollingSink mit dem obigen Code gibt folgende Ausnahme auszuführen:

java.lang.Exception: Could not forward element to next operator 
     at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.run(LegacyFetcher.java:222) 
     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08.run(FlinkKafkaConsumer08.java:316) 
     at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) 
     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) 
     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) 
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
     at java.lang.Thread.run(Thread.java:744) 
Caused by: java.lang.RuntimeException: Could not forward element to next operator 
     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) 
     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) 
     at org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:158) 
     at org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher$SimpleConsumerThread.run(LegacyFetcher.java:664) 
Caused by: java.nio.channels.ClosedChannelException 
     at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1353) 
     at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:98) 
     at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58) 
     at java.io.DataOutputStream.write(DataOutputStream.java:107) 
     at org.apache.avro.file.DataFileWriter$BufferedFileOutputStream$PositionFilter.write(DataFileWriter.java:446) 
     at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) 
     at java.io.BufferedOutputStream.write(BufferedOutputStream.java:121) 
     at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216) 
     at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150) 
     at org.apache.avro.file.DataFileStream$DataBlock.writeBlockTo(DataFileStream.java:366) 
     at org.apache.avro.file.DataFileWriter.writeBlock(DataFileWriter.java:383) 
     at org.apache.avro.file.DataFileWriter.sync(DataFileWriter.java:401) 
     at pl.neptis.FlinkKafkaConsumer.utils.AvroWriter.close(AvroWriter.scala:36) 
     at org.apache.flink.streaming.connectors.fs.RollingSink.closeCurrentPartFile(RollingSink.java:476) 
     at org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:419) 
     at org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:373) 
     at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) 
     at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) 
     ... 3 more 

Antwort

0

ich endlich eine Lösung gefunden zu haben. Da der Stream von RollingSink verwaltet wird, kann er in einer Klasse, die Writer implementiert, nicht geschlossen werden. Wenn DataFileWriter andererseits einen Stream umschließt und eine Datei an hdfs ausgegeben werden soll, ist eine Synchronisierung oder ein Schließen erforderlich. Der Trick besteht darin, DataFileWriter nicht zu schließen, sondern zu synchronisieren und dann zu verwerfen, indem man ihm Null zuweist (nicht sehr idiomatisch, wenn man Scala und funktionale Programmierung in Betracht zieht, aber hey, Flink wird in Java entwickelt). Also löste dieser einfache Trick mein Problem:

override def close(): Unit = { 
    if(outputWriter != null) { 
     outputWriter.sync() 
    } 
    outputWriter = null 
    outputStream = null 
    } 
Verwandte Themen