2017-10-11 3 views

Antwort

1

Es gibt keine Möglichkeit Größe des PCollection zu überprüfen ohne dass eine PTransform darauf angewendet wird (wie zB Count.globally() oder Combine.combineFn()), weil PCollection nicht wie eine typische Collection in Java SDK oder so aussieht.

Es ist eine Abstraktion der begrenzten oder unbegrenzten Sammlung von Daten, wo Daten in die Sammlung für eine Operation zugeführt wird, die darauf angewendet wird (z. B. PTransform). Auch ist es parallelisiert (wie das P am Anfang der Klasse suggeriert).

Daher benötigen Sie einen Mechanismus, um die Anzahl der Elemente von jedem Arbeiter/Knoten zu erhalten und sie zu kombinieren, um einen Wert zu erhalten. Ob es 0 oder n ist, kann bis zum Ende dieser Transformation nicht bekannt sein.

1

Sie haben nicht angegeben, welches SDK Sie verwenden, also nahm ich Python an. Der Code ist leicht zu Java portierbar.

Sie können das globale Zählen von Elementen anwenden und dann den numerischen Wert mithilfe eines einfachen Vergleichs auf Boolean umwandeln. Sie werden diesen Wert Seite-Eingang mit pvalue.AsSingleton Funktion der Lage sein, wie folgt aus:

import apache_beam as beam 
from apache_beam import pvalue 

is_empty_check = (your_pcollection 
        | "Count" >> beam.combiners.Count.Globally() 
        | "Is empty?" >> beam.Map(lambda n: n == 0) 
        ) 

another_pipeline_branch = (
    p 
    | beam.Map(do_something, is_empty=pvalue.AsSingleton(is_empty_check)) 
) 

Verwendung des Seiteneingangs ist die folgende:

def do_something(element, is_empty): 
    if is_empty: 
     # yes 
    else: 
     # no