0

Ich benutze das Python Beam SDK 0.6.0. Und ich möchte meine Ausgabe in JSON in Google Cloud Storage schreiben. Was ist der beste Weg, dies zu tun?Wie schreibe ich das Ergebnis in JSON-Dateien in gcs in Dataflow/Beam

I quess Ich kann WriteToText aus der Text IO-Senke verwenden, aber dann muss ich jede Zeile separat formatieren, oder? Wie speichere ich mein Ergebnis in gültigen JSON-Dateien, die Objektlisten enthalten?

Antwort

1

Ok, als Referenz, löste ich dies, indem ich meine eigene Spüle Gebäude auf der _TextSink von WriteToText im Strahl SDK verwendet.

Nicht sicher, ob dies der beste Weg ist, aber es funktioniert bisher gut. Ich hoffe, es könnte jemand anderem helfen.

import os 
import json 

import apache_beam as beam 
from apache_beam import coders 
from apache_beam.io.iobase import Write 
from apache_beam.transforms import PTransform 

class _JsonSink(beam.io.FileSink): 
    """A Dataflow sink for writing JSON files.""" 

    def __init__(self, 
       file_path_prefix, 
       file_name_suffix='', 
       num_shards=0, 
       shard_name_template=None, 
       coder=coders.ToStringCoder(), 
       compression_type=beam.io.CompressionTypes.AUTO): 

     super(_JsonSink, self).__init__(
      file_path_prefix, 
      file_name_suffix=file_name_suffix, 
      num_shards=num_shards, 
      shard_name_template=shard_name_template, 
      coder=coder, 
      mime_type='text/plain', 
      compression_type=compression_type) 
     self.last_rows = dict() 

    def open(self, temp_path): 
     """ Open file and initialize it w opening a list.""" 
     file_handle = super(_JsonSink, self).open(temp_path) 
     file_handle.write('[\n') 
     return file_handle 

    def write_record(self, file_handle, value): 
     """Writes a single encoded record converted to JSON and terminates the 
     line w a comma.""" 
     if self.last_rows.get(file_handle, None) is not None: 
      file_handle.write(self.coder.encode(
       json.dumps(self.last_rows[file_handle]))) 
      file_handle.write(',\n') 

     self.last_rows[file_handle] = value 

    def close(self, file_handle): 
     """Finalize the JSON list and close the file handle returned from 
     ``open()``. Called after all records are written. 
     """ 
     if file_handle is not None: 
      # Write last row without a comma 
      file_handle.write(self.coder.encode(
       json.dumps(self.last_rows[file_handle]))) 

      # Close list and then the file 
      file_handle.write('\n]\n') 
      file_handle.close() 


class WriteToJson(PTransform): 
    """PTransform for writing to JSON files.""" 

    def __init__(self, 
       file_path_prefix, 
       file_name_suffix='', 
       num_shards=0, 
       shard_name_template=None, 
       coder=coders.ToStringCoder(), 
       compression_type=beam.io.CompressionTypes.AUTO): 

     self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards, 
           shard_name_template, coder, compression_type) 

    def expand(self, pcoll): 
     return pcoll | Write(self._sink) 

die Senke verwenden ist ähnlich, wie Sie den Text Spüle verwenden:

pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json') 
0

jede Datei ist eine einzige Liste mit einem Bündel von Elementen enthalten, die schwierig, weil Sie gruppieren bräuchten eine Reihe von Elementen und dann schreiben Sie sie zusammen in eine Datei. Lassen Sie mich Ihnen raten, ein anderes Format zu verwenden.

Sie können das Format JSON Lines berücksichtigen, wobei jede Zeile in einer Datei ein einzelnes JSON-Element darstellt.

Die Umwandlung Ihrer Daten in JSON-Zeilen sollte ziemlich einfach sein. Die folgende Transformation sollte es tun:

class WriteToJsonLines(beam.PTransform): 
    def __init__(self, file_name): 
     self._file_name = file_name 

    def expand(self, pcoll): 
     return (pcoll 
       | 'format json' >> beam.Map(json.dumps) 
       | 'write to text' >> beam.WriteToText(self._file_name)) 

Schließlich, wenn Sie später möchten Ihre JSON-Linien-Dateien lesen, können Sie Ihre eigenen JsonLinesSource schreiben oder den in beam_utils verwenden.

+0

Lassen Sie mich wissen, wenn Ihre Daten nicht auf diese Weise organisiert sind, und ich werde versuchen, etwas herauszufinden, das für Sie funktioniert. – Pablo

+0

Danke. Ich brauche es wirklich, um JSON zu sein. Schrieb am Ende mein eigenes Waschbecken, das meiner Meinung nach funktionieren sollte. – while

+0

das ist fair! ;) – Pablo

Verwandte Themen