0

Ich habe einen kleinen Test mit Google Dataflow (Apache-Beam) eingerichtet. Der Anwendungsfall für das Experiment besteht darin, eine (CSV-) Datei zu erstellen und eine ausgewählte Spalte in eine (TXT-) Datei zu schreiben.Google Dataflow scheint zu fallen 1000. Rekord

Der Code für das Experiment ist, wie nachstehend aufgeführt:

from __future__ import absolute_import 

import argparse 
import logging 
import re 

import apache_beam as beam 
from apache_beam.io import ReadFromText 
from apache_beam.io import WriteToText 
from apache_beam.metrics import Metrics 
from apache_beam.metrics.metric import MetricsFilter 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import SetupOptions 

class EmitColDoFn(beam.DoFn): 
    first = True 
    header = "" 
    def __init__(self, i): 
     super(EmitColDoFn, self).__init__() 
     self.line_count = Metrics.counter(self.__class__, 'lines') 
     self.i = i 

    def process(self, element): 
     if self.first: 
      self.header = element 
      self.first = False 
     else: 
      self.line_count.inc() 
      cols = re.split(',', element) 
      return (cols[self.i],) 

def run(argv=None): 
    """Main entry point; defines and runs the wordcount pipeline.""" 
    parser = argparse.ArgumentParser() 
    parser.add_argument('--input', 
         dest='input', 
         default='/users/sms/python_beam/data/MOCK_DATA (4).csv', 
#      default='gs://dataflow-samples/shakespeare/kinglear.txt', 
         help='Input file to process.') 
    parser.add_argument('--output', 
         dest='output', 
         default="https://stackoverflow.com/users/sms/python_beam/data/", 
#      required=True, 
         help='Output file to write results to.') 
    known_args, pipeline_args = parser.parse_known_args(argv) 

    pipeline_options = PipelineOptions(pipeline_args) 
    pipeline_options.view_as(SetupOptions).save_main_session = True 
    p = beam.Pipeline(options=pipeline_options) 

    # Read the text file[pattern] into a PCollection. 
    lines = p | 'read' >> ReadFromText(known_args.input) 

    column = (lines 
      | 'email col' >> (beam.ParDo(EmitColDoFn(3))) 
      | "col file" >> WriteToText(known_args.output, ".txt", shard_name_template="SS_Col")) 

    result = p.run() 
    result.wait_until_finish() 

    if (not hasattr(result, 'has_job') # direct runner 
     or result.has_job): # not just a template creation 
     lines_filter = MetricsFilter().with_name('lines') 
     query_result = result.metrics().query(lines_filter) 
     if query_result['counters']: 
      lines_counter = query_result['counters'][0] 

     print "Lines committed", lines_counter.committed 
run() 

Die letzten Zeilen der Probe 1 unter:

990,Corabel,Feldbau,[email protected],Female,84.102.162.190,DJ 
991,Kiley,Rottcher,[email protected],Male,91.97.155.28,CA 
992,Glenda,Clist,[email protected],Female,24.98.253.127,UA 
993,Ingunna,Maher,[email protected],Female,159.31.127.19,PL 
994,Megan,Giacopetti,[email protected],Female,115.6.63.52,RU 
995,Briny,Dutnall,[email protected],Female,102.81.33.24,SE 
996,Jan,Caddan,[email protected],Female,115.142.222.106,PL 

Lauf dies erzeugt die erwartete Ausgabe von:

/usr/local/bin/python2.7 
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py 
No handlers could be found for logger "oauth2client.contrib.multistore_file" 
Lines committed 996 

Process finished with exit code 0 

Jetzt für die seltsamen Ergebnisse. Im nächsten Durchlauf wird die Anzahl der Zeilen auf 1000 erhöht

994,Megan,Giacopetti,[email protected],Female,115.6.63.52,RU 
995,Briny,Dutnall,[email protected],Female,102.81.33.24,SE 
996,Jan,Caddan,[email protected],Female,115.142.222.106,PL 
997,Shannen,Gaisford,[email protected],Female,167.255.222.92,RU 
998,Lorianna,Slyne,[email protected],Female,54.169.60.13,CN 
999,Franklin,Yaakov,[email protected],Male,122.1.92.236,CN 
1000,Wilhelmine,Cariss,[email protected],Female,237.48.113.255,PL 

Aber diesmal löschte ist

/usr/local/bin/python2.7 
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py 
No handlers could be found for logger "oauth2client.contrib.multistore_file" 
Lines committed 999 

Process finished with exit code 0 

Inspektion der Ausgabedatei zeigt, dass die letzte Zeile nicht verarbeitet wurde.

[email protected] 
[email protected] 
[email protected] 
[email protected] 
[email protected] 

Irgendwelche Ideen, was hier vor sich geht?

Antwort

5

'EditColDoFn' überspringt die erste Zeile, vorausgesetzt, es gibt eine Instanz für jede Datei. Wenn Sie mehr als 1000 Zeilen haben, erstellt der DirectRunner zwei Bündel: 1000 Zeilen in der ersten Zeile und 1 Zeile in der zweiten Zeile. In einer Beam-Anwendung kann die Eingabe für die parallele Verarbeitung in mehrere Bundles aufgeteilt werden. Es gibt keine Korrelation mit der Anzahl der Dateien und der Anzahl der Pakete. Die gleiche Anwendung kann Terabyte von Daten verarbeiten, die über viele Dateien verteilt sind.

ReadFromText hat eine Option 'skip_header_lines', die Sie auf 1 setzen können, um die Kopfzeile in jeder Ihrer Eingabedateien zu überspringen.

Verwandte Themen