2017-08-23 2 views
0

Ich versuche, eine Eingabetextdatei mit dem ersten Zeichen mit GCP Dataflow Python zu verarbeiten. Wenn das erste Zeichen eines Eintrags 'A' ist, möchte ich die Datei in A.txt und so weiter speichern. Ebenso habe ich jedem Charakter eine Nummer zugeordnet. Ich habe dafür zwei Hashmaps gespeichert. Im Folgenden ist mein Code:Fehler beim Ausführen von Python-Datenfluss-Job:

splitHashMap={'A':1,'F':4, 'J':4, 'Z':4, 'G':10, 'I':11}; 
fileHashMap= {'A':'A.txt','B':'B.txt','F':'F.txt','J':'J.txt','Z':'Z.txt','G':'G.txt','I':'I.txt'}; 
def to_table_row(x): 
    firstChar=x[0][0]; 
    global splitHashMap 
    global fileHashMap 
    print splitHashMap[firstChar]; 
    x | WriteToText(fileHashMap[firstChar]); 
    return {firstChar} 

Der Fehler mit der WriteToText Funktion und ist wie folgt:

PTransform Create: Refusing to treat string as an iterable. (string=u'AIGLM0012016-02-180000000112016-02-18-12.00.00.123456GB CARMB00132') [while running 'ToTableRows'] 

Könnte jemand bitte helfen Sie mir, dieses Problem zu beheben?

EDIT: Der Rest des Codes der Pipeline enthält, ist wie folgt:

arser = argparse.ArgumentParser() 
parser.add_argument('--input', 
        dest='input', 
        default='gs://dataflow-samples/shakespeare/kinglear.txt', 
        help='Input file to process.') 
parser.add_argument('--output', 
        dest='output', 
        help='Output file to write results to.') 
known_args, pipeline_args = parser.parse_known_args(None) 
pipeline_options = PipelineOptions(pipeline_args) 
pipeline_options.view_as(SetupOptions).save_main_session = True 
p = beam.Pipeline(options=pipeline_options) 


lines = p | 'read' >> ReadFromText(known_args.input) 


lines | 'ToTableRows' >> beam.Map(to_table_row); 

result = p.run() 

Ich bitte Sie, mir zu helfen, jetzt das Problem zu beheben. Der Befehl, den ich verwende, um die Python-Datei zu TUN ist:

python File_parse.py ---input temp.txt 

Temp.txt ist wie folgt:

Aasadasd asdasd adsad af 
Jdsad asdasd asd as 
A asdd ad agfsfg sfg 
Z afsdfrew320pjpoji 
Idadfsd w8480ujfds 

Die gewünschte Ausgabe ist, dass alle Dateien, die mit ‚A‘ gehen beginnen zu „A. txt "," B "gehe zu" B.txt "und so weiter. Es wäre großartig, wenn Sie den Code in Ihrer Antwort geschrieben hätten.

+0

Können Sie Ihren gesamten Pipeline-Code teilen? Sie können keine Strings in Transformationen übertragen, wie Sie es versuchen. Wenn Sie Ihre gesamte Pipeline teilen, helfen wir Ihnen, herauszufinden, wie Sie die benötigte Funktionalität schreiben können. – Pablo

+0

@Pablo: Ich habe die gesamte Pipeline hinzugefügt. Sie können einen Blick werfen – Nagaraju

+0

Ich habe Ihre Frage beantwortet. Ich ermutige Sie, die Etikette von StackOverflow zu überprüfen, da Sie mit Ihrer Frage einige Dinge verletzt haben. Wenn die Frage hilfreich ist, können Sie sie auch als solche auswählen. – Pablo

Antwort

0

Ihre Verwendung von WriteToText ist nicht geeignet. Sie können keine Zeichenfolge an eine PTransform übergeben. Stattdessen müssen Sie PCollections in PTransforms übergeben. Im folgenden Code können Sie separate PCollections für jeden Fall eines ersten Zeichens erstellen, und übergeben das

Was Sie in diesem Fall tun kann, so etwas wie dieses:

file_hash_map= {'A':'A.txt','B':'B.txt','F':'F.txt', 
       'J':'J.txt','Z':'Z.txt','G':'G.txt','I':'I.txt'} 
existing_chars = file_hash_map.keys() 

class ToTableRowDoFn(beam.DoFn): 
    def process(self, element): 
    first_char = element[0][0] 
    if first_char in file_hash_map: 
     yield pvalue.TaggedOutput(first_char, element) 
    else: 
     # When the first char of the word is not from the allowed 
     # characters, we just send it to the main output. 
     yield element 

lines = p | 'read' >> ReadFromText(known_args.input) 

multiple_outputs = (
    lines | 
    'ToTableRows' >> beam.ParDo(ToTableRowDoFn()) 
          .with_outputs(*existing_chars, main='main')); 

for pcollection_name in existing_chars: 
    char_pcollection = getattr(multiple_outputs, pcollection_name) 
    char_pcollection | WriteToFile(file_hash_map[pcollection_name]) 

Der Kern dieses Codes ist Auf der for Schleife, wo wir über jede der PCollections Ausgabe durchlaufen, und schreiben Sie ihren Inhalt einzeln in eine andere Datei.