Ich habe einen kleinen Test mit Google Dataflow (Apache-Beam) eingerichtet. Der Anwendungsfall für das Experiment besteht darin, eine (CSV-) Datei zu erstellen und eine ausgewählte Spalte in eine (TXT-) Datei zu schreiben.Google Dataflow scheint zu fallen 1000. Rekord
Der Code für das Experiment ist, wie nachstehend aufgeführt:
from __future__ import absolute_import
import argparse
import logging
import re
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class EmitColDoFn(beam.DoFn):
first = True
header = ""
def __init__(self, i):
super(EmitColDoFn, self).__init__()
self.line_count = Metrics.counter(self.__class__, 'lines')
self.i = i
def process(self, element):
if self.first:
self.header = element
self.first = False
else:
self.line_count.inc()
cols = re.split(',', element)
return (cols[self.i],)
def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='/users/sms/python_beam/data/MOCK_DATA (4).csv',
# default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
default="https://stackoverflow.com/users/sms/python_beam/data/",
# required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
# Read the text file[pattern] into a PCollection.
lines = p | 'read' >> ReadFromText(known_args.input)
column = (lines
| 'email col' >> (beam.ParDo(EmitColDoFn(3)))
| "col file" >> WriteToText(known_args.output, ".txt", shard_name_template="SS_Col"))
result = p.run()
result.wait_until_finish()
if (not hasattr(result, 'has_job') # direct runner
or result.has_job): # not just a template creation
lines_filter = MetricsFilter().with_name('lines')
query_result = result.metrics().query(lines_filter)
if query_result['counters']:
lines_counter = query_result['counters'][0]
print "Lines committed", lines_counter.committed
run()
Die letzten Zeilen der Probe 1 unter:
990,Corabel,Feldbau,[email protected],Female,84.102.162.190,DJ
991,Kiley,Rottcher,[email protected],Male,91.97.155.28,CA
992,Glenda,Clist,[email protected],Female,24.98.253.127,UA
993,Ingunna,Maher,[email protected],Female,159.31.127.19,PL
994,Megan,Giacopetti,[email protected],Female,115.6.63.52,RU
995,Briny,Dutnall,[email protected],Female,102.81.33.24,SE
996,Jan,Caddan,[email protected],Female,115.142.222.106,PL
Lauf dies erzeugt die erwartete Ausgabe von:
/usr/local/bin/python2.7
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Lines committed 996
Process finished with exit code 0
Jetzt für die seltsamen Ergebnisse. Im nächsten Durchlauf wird die Anzahl der Zeilen auf 1000 erhöht
994,Megan,Giacopetti,[email protected],Female,115.6.63.52,RU
995,Briny,Dutnall,[email protected],Female,102.81.33.24,SE
996,Jan,Caddan,[email protected],Female,115.142.222.106,PL
997,Shannen,Gaisford,[email protected],Female,167.255.222.92,RU
998,Lorianna,Slyne,[email protected],Female,54.169.60.13,CN
999,Franklin,Yaakov,[email protected],Male,122.1.92.236,CN
1000,Wilhelmine,Cariss,[email protected],Female,237.48.113.255,PL
Aber diesmal löschte ist
/usr/local/bin/python2.7
/Users/sms/Library/Preferences/PyCharmCE2017.1/scratches/scratch_4.py
No handlers could be found for logger "oauth2client.contrib.multistore_file"
Lines committed 999
Process finished with exit code 0
Inspektion der Ausgabedatei zeigt, dass die letzte Zeile nicht verarbeitet wurde.
[email protected]
[email protected]
[email protected]
[email protected]
[email protected]
Irgendwelche Ideen, was hier vor sich geht?