1

Ich schreibe eine Dataflow-Streaming-Pipeline. In einer der Transformationen, DoFn möchte ich auf einen externen Dienst zugreifen - in diesem Fall ist es Datastore.Initialisieren externer Dienstverbindungen in Beam

Gibt es eine Best Practice für diese Art von Initialisierungsschritt? Ich möchte das Datenspeicherverbindungsobjekt nicht für jeden processElement-Methodenaufruf erstellen.

Antwort

2

Im Datenfluß-SDK, das einfachste, was man tun kann, ist eine Überprüfung addieren Sie externe Service in Ihrem ersten Element zu initialisieren:

class DatastoreCallingDoFn extends DoFn { 

    private ExtServiceHandle handle = null; 

    private ExtServiceHandle initializeConnection() { 
     // ... 
    } 

    public void processElement(ProcessContext c) { 
     // ... process each element -- setup will have been called 
     if (handle == null) { 
     handle = initializeConnection(); 
     } 
     // Process elements 
    } 
} 

Wenn Sie Strahl verwenden, können Sie die @Setup verwenden Dekorator um eine Funktion in Ihrem DoFn zu dekorieren, um die Einrichtung Ihres DoFn, wie die Initialisierung der Datenspeicherverbindung zu tun.

class DatastoreCallingDoFn extends DoFn { 
    @Setup 
    public void initializeDatastoreConnection() { 
     // ... 
    } 

    @ProcessElement 
    public void processElement(ProcessContext c) { 
     // ... process each element -- setup will have been called 
    } 
} 

Dies ist ähnlich dem answer in this question.

+0

Seine Streaming-Pipeline, so Setup-Methode wird für jedes Element aufgerufen werden, und ich fand die 'Setup' oder' ProcessElement' Annotationen in Standard-Datenflussabhängigkeit. –

+0

Es tut mir leid. '@ Setup' und' ProcessElement' sind Teil der Beam-Bibliothek. Ich bin verwirrt. Allerdings wird '@ Setup' nicht pro Element aufgerufen. Es heißt pro-DoFn-Instanz. Ich werde die Frage bearbeiten, um beide Optionen zu berücksichtigen. – Pablo