2017-11-02 3 views
1

Ich habe eine Dataflow-Pipeline (SDK 2.1.0, Apache Beam 2.2.0), die einfach RDF liest (in N-Triples, also sind es nur Textdateien) von GCS, transformiert es irgendwie und schreibt es zurück zu GCS, aber in einem anderen Eimer. In dieser Pipeline verwende ich Seiteneingaben, die drei einzelne Dateien sind (eine Datei pro Seiteneingabe) und sie in einem ParDo verwenden.SideInputs korrumpieren die Daten in DataFlows Pipeline

Um mit RDF in Java zu arbeiten, verwende ich Apache Jena, also wird jede Datei in eine Instanz der Model-Klasse eingelesen. Da Dataflow keinen Coder dafür hat, habe ich es selbst entwickelt (RDFModelCoder, siehe unten). Es funktioniert gut in Anzahl der anderen Pipelines, die ich erstellt habe.

Das Problem mit dieser bestimmten Pipeline ist, wenn ich die Seiteneingaben hinzufügen, schlägt die Ausführung mit einer Ausnahme, die eine Beschädigung der Daten anzeigt, d. H. Einige Müll hinzugefügt. Sobald ich die Seiteneingaben entfernt habe, beendet die Pipeline die Ausführung erfolgreich.

Die Ausnahme (aus RDFModelCoder geworfen wird, siehe unten):

Caused by: org.apache.jena.atlas.RuntimeIOException: java.nio.charset.MalformedInputException: Input length = 1 
    at org.apache.jena.atlas.io.IO.exception(IO.java:233) 
    at org.apache.jena.atlas.io.CharStreamBuffered$SourceReader.fill(CharStreamBuffered.java:77) 
    at org.apache.jena.atlas.io.CharStreamBuffered.fillArray(CharStreamBuffered.java:154) 
    at org.apache.jena.atlas.io.CharStreamBuffered.advance(CharStreamBuffered.java:137) 
    at org.apache.jena.atlas.io.PeekReader.advanceAndSet(PeekReader.java:235) 
    at org.apache.jena.atlas.io.PeekReader.init(PeekReader.java:229) 
    at org.apache.jena.atlas.io.PeekReader.peekChar(PeekReader.java:151) 
    at org.apache.jena.atlas.io.PeekReader.makeUTF8(PeekReader.java:92) 
    at org.apache.jena.riot.tokens.TokenizerFactory.makeTokenizerUTF8(TokenizerFactory.java:48) 
    at org.apache.jena.riot.lang.RiotParsers.createParser(RiotParsers.java:57) 
    at org.apache.jena.riot.RDFParserRegistry$ReaderRIOTLang.read(RDFParserRegistry.java:198) 
    at org.apache.jena.riot.RDFParser.read(RDFParser.java:298) 
    at org.apache.jena.riot.RDFParser.parseNotUri(RDFParser.java:288) 
    at org.apache.jena.riot.RDFParser.parse(RDFParser.java:237) 
    at org.apache.jena.riot.RDFParserBuilder.parse(RDFParserBuilder.java:417) 
    at org.apache.jena.riot.RDFDataMgr.parseFromInputStream(RDFDataMgr.java:870) 
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:268) 
    at org.apache.jena.riot.RDFDataMgr.read(RDFDataMgr.java:254) 
    at org.apache.jena.riot.adapters.RDFReaderRIOT.read(RDFReaderRIOT.java:69) 
    at org.apache.jena.rdf.model.impl.ModelCom.read(ModelCom.java:305) 

Und hier kann man den Müll (am Ende) sehen:

<http://example.com/typeofrepresentative/08> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <http://www.w3.org/2002/07/owl#NamedIndividual> . ������** �����I��.�������������u������� 

Die Pipeline:

val one = p.apply(TextIO.read().from(config.getString("source.one"))) 
      .apply(Combine.globally(SingleValue())) 
      .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES))) 

val two = p.apply(TextIO.read().from(config.getString("source.two"))) 
      .apply(Combine.globally(SingleValue())) 
      .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES))) 

val three = p.apply(TextIO.read().from(config.getString("source.three"))) 
      .apply(Combine.globally(SingleValue())) 
      .apply(ParDo.of(ConvertToRDFModel(RDFLanguages.NTRIPLES))) 

val sideInput = PCollectionList.of(one).and(two).and(three) 
       .apply(Flatten.pCollections()) 
       .apply(View.asList()) 

p.apply(RDFIO.Read 
        .from(options.getSource()) 
        .withSuffix(RDFLanguages.strLangNTriples)) 
.apply(ParDo.of(SparqlConstructETL(config, sideInput)) 
         .withSideInputs(sideInput)) 
.apply(RDFIO.Write 
        .to(options.getDestination()) 
        .withSuffix(RDFLanguages.NTRIPLES)) 

Und nur um das ganze Bild hier sind Implementierungen von SingleValue und ConvertToRDFModel Pardos:

class SingleValue : SerializableFunction<Iterable<String>, String> { 
    override fun apply(input: Iterable<String>?): String { 
     if (input != null) { 
      return input.joinToString(separator = " ") 
     } 
     return "" 
    } 
} 

class ConvertToRDFModel(outputLang: Lang) : DoFn<String, Model>() { 
    private val lang: String = outputLang.name 

    @ProcessElement 
    fun processElement(c: ProcessContext?) { 
     if (c != null) { 
      val model = ModelFactory.createDefaultModel() 
      model.read(StringReader(c.element()), null, lang) 
      c.output(model) 
     } 
    } 
} 

Die Umsetzung RDFModelCoder:

class RDFModelCoder(private val decodeLang: String = RDFLanguages.strLangNTriples, 
        private val encodeLang: String = RDFLanguages.strLangNTriples) 
    : AtomicCoder<Model>() { 

    private val LOG = LoggerFactory.getLogger(RDFModelCoder::class.java) 

    override fun decode(inStream: InputStream): Model { 
     val bytes = StreamUtils.getBytes(inStream) 
     val model = ModelFactory.createDefaultModel() 

     model.read(ByteArrayInputStream(bytes), null, decodeLang) // the exception is thrown from here 

     return model 
    } 

    override fun encode(value: Model, outStream: OutputStream?) { 
     value.write(outStream, encodeLang, null) 
    } 

} 

überprüfte ich die Seiteneingabedateien mehrfach, es geht ihnen gut, sie haben UTF-8-Codierung.

+0

Dies ist kein Jena-Problem. Die Eingabe ist nicht gültig UTF-8. PeekReader schlürft ziemlich großen Puffer (128K Bytes), um die Umwandlung von Byte zu Zeichen zu verbessern. Daher MalformedInputException. Die Länge = 1 bedeutet, dass es sich um eine illegale Ein-Byte-UTF-8-Sequenz in diesem Block handelt. – AndyS

Antwort

3

Am wahrscheinlichsten ist der Fehler in der Implementierung RDFModelCoder. Bei der Implementierung von encode/decode ist zu beachten, dass die bereitgestellten InputStream und OutputStream nicht ausschließlich der aktuellen Instanz gehören, die codiert/decodiert wird. Z.B. Es könnte mehr Daten in der InputStream nach der codierten Form Ihrer aktuellen Model sein. Wenn Sie StreamUtils.getBytes(inStream) verwenden, greifen Sie beide Daten des aktuellen codierten Model und alles, was im Stream war.

Generell ist es beim Schreiben eines neuen Coder eine gute Idee, nur existierende Coder zu kombinieren, anstatt den Stream manuell zu analysieren: das ist weniger fehleranfällig. Ich würde vorschlagen, das Modell zu/von byte[] zu konvertieren und ByteArrayCoder.of() zu verwenden, um es zu codieren/zu decodieren.

+0

Leider kann ich nicht mit dem Serialisierungsmechanismus von Java oder Kryo (im Beispiel) serialisieren, weil das Modell nicht einfach serialisierbar ist, also muss ich es in ein Textformat schreiben und es zurücklesen. Ich habe versucht, Kryo für diese Aufgabe zu verwenden, aber ohne Erfolg. –

+0

Wie kann ich sicherstellen, dass ich nicht mehr als benötigt aus dem Eingabestream lesen? –

+0

Sieht so aus, als hätte es geholfen, ich teste immer noch, aber erste Tests bestanden! Ich schaute auf StringUtf8Coder und fand heraus, dass sie 'VarInt.encode' und' VarInt.decode' aufrufen, um die Länge eines Chunks im Stream zu schreiben/zu lesen. Ich weiß nicht, warum ich es nicht früher gefunden habe, vielleicht weil die Dokumentation nicht so klar ist ... –

1

Apache Jena bietet die Elephas IO Module, die Hadoop IO-Unterstützung haben, da Beam Hadoop InputFormat IO unterstützt, sollten Sie in der Lage sein, das zu verwenden, um Ihre NTriples-Datei einzulesen.

Dies wird wahrscheinlich wesentlich effizienter sein, da die NTriples support in Elephas Lage ist, die IO parallelisieren und Caching das gesamte Modell in den Speicher zu vermeiden (in der Tat ist es nicht Model überhaupt verwenden):

Configuration myHadoopConfiguration = new Configuration(false); 

// Set Hadoop InputFormat, key and value class in configuration 
myHadoopConfiguration.setClass("mapreduce.job.inputformat.class", 
           NTriplesInputFormat.class, InputFormat.class); 
myHadoopConfiguration.setClass("key.class", LongWritable.class, Object.class); 
myHadoopConfiguration.setClass("value.class", TripleWritable.class, Object.class); 
// Set any other Hadoop config you might need 

// Read data only with Hadoop configuration. 
p.apply("read", 
     HadoopInputFormatIO.<LongWritable, TripleWritable>read() 
     .withConfiguration(myHadoopConfiguration); 

Natürlich Dies kann erfordern, dass Sie Ihre gesamte Pipeline etwas umgestalten.

+0

Vielen Dank für Ihre Antwort! –

Verwandte Themen