2017-02-01 4 views
2

Es ist möglich, nicht verschachtelte JSON-Dateien auf Cloud Storage mit Datenfluss über zu lesen:Lesen verschachtelte JSON in Google Dataflow/Apache Strahl

p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of())); 

Wenn ich will nur diese Protokolle mit minimaler Filterung BigQuery schreiben was ich tun kann so durch eine DoFn wie diese verwenden:

private static class Formatter extends DoFn<TableRow,TableRow> { 

     @Override 
     public void processElement(ProcessContext c) throws Exception { 

      // .clone() since input is immutable 
      TableRow output = c.element().clone(); 

      // remove misleading timestamp field 
      output.remove("@timestamp"); 

      // set timestamp field by using the element's timestamp 
      output.set("timestamp", c.timestamp().toString()); 

      c.output(output); 
     } 
    } 
} 

Allerdings weiß ich nicht, wie verschachtelte Felder in der JSON-Datei auf diese Weise zuzugreifen.

  1. Wenn der TableRow enthält ein RECORDr genannt, ist es möglich, seine Schlüssel/Werte zuzugreifen, ohne weitere Serialisierung/Deserialisierung?
  2. Wenn ich serialisiert müssen/deserialisieren dich mit der Jackson Bibliothek, macht es mehr Sinn machen, eine der Standard-Coder von TextIO.Read statt TableRowJsonCoder zu verwenden, einen Teil der Leistung zurück zu gewinnen, die ich auf diese Weise zu verlieren?

EDIT

Die Dateien sind neu-Linie begrenzt, und in etwa so aussehen:

{"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}} 
{"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}} 
+0

Wie werden die Dateien formatiert? Sind sie durch Zeilenumbruch getrennt oder werden Zeilenumbrüche möglicherweise in einem der verschachtelten Datensätze angezeigt? –

+0

Die Dateien sind durch Zeilenumbruch getrennt und ich erwarte keine Zeilenumbrüche in einem der verschachtelten Datensätze. Ich habe meine Frage bearbeitet, um ein Beispiel hinzuzufügen. – Tobi

Antwort

4

Ihre beste Wette wahrscheinlich das, was Sie # beschrieben in zu tun ist, 2 und verwendet Jackson direkt. Es ist am sinnvollsten, dass das TextIO-Leseelement das ausführt, für das es erstellt wurde - Zeilen aus einer Datei mit dem Zeichenfolgencodierer lesen - und dann die Elemente mit einem DoFn analysieren. Etwas wie das Folgende:

PCollection<String> lines = pipeline 
    .apply(TextIO.from("gs://bucket/...")); 
PCollection<TableRow> objects = lines 
    .apply(ParDo.of(new DoFn<String, TableRow>() { 
    @Override 
    public void processElement(ProcessContext c) { 
     String json = c.element(); 
     SomeObject object = /* parse json using Jackson, etc. */; 
     TableRow row = /* create a table row from object */; 
     c.output(row); 
    } 
    }); 

Beachten Sie, dass Sie dies auch mit mehreren ParDos tun können.

+0

Ich habe es am Ende mit mehreren ParDos gelöst, danke. – Tobi