2017-10-10 2 views
0

Wenn ich eine .txt-Datei in GCS mit einer Liste von Wörtern gespeichert habe, die als Teil einer beam.Filter verwendet wird, kann diese Liste dynamisch innerhalb meiner Apache-Beam-Pipeline aufgerufen werden? Ich weiß, dass ich diese Liste als eine globale Variable innerhalb der Pipeline definieren kann, aber ich bin nicht sicher, wie man die ganze Datei in eine Liste einliest und ob es Balkentricks gibt, um dies zu erreichen. Irgendwelche Vorschläge? Hier ist meine aktuelle Implementierung, die nicht funktioniert ..Google Cloud Dataflow Zugriff .txt-Datei auf Cloud-Speicher

def boolean_terms(word, term_list): 
    if word in term_list: 
    return (word, 1) 
    else: 
    return (word, 0) 

# side table 
filter_terms = p | beam.io.ReadFromText(path_to_gcs_txt_file) 

words = ... 

filtered_words = words | beam.FlatMap(lambda x: 
    [boolean_terms(word, filter_terms) for word in x]) 

bekomme ich folgende Fehlermeldung „Typeerror: Argument vom Typ‚_InvalidUnpickledPCollection‘ist nicht iterable“

Antwort

3

Sie die Liste der Wörter als side input zugreifen . Ich glaube, die beam.Filter Transformation unterstützt die Verwendung von Seiteneingaben von der Filterfunktion in genau der gleichen Weise wie FlatMap und ParDo in den Beispielen von diesem Link.

Etwas wie:

words | beam.Filter(lambda x, filter_terms: word in filter_terms, 
        filter_terms=pvalue.AsList(p | beam.io.ReadFromText(path))) 
+0

Dank! Ich denke, ich bin näher, aber es scheint immer noch nicht für mich zu arbeiten. Fehle ich etwas? – reese0106

+0

Ah Ich denke, ich habe es herausgefunden - ich musste 'pvalue.AsList (filter_terms)' hinzufügen, damit dies richtig funktioniert – reese0106

Verwandte Themen