Im Datenfluß-SDK, das einfachste, was man tun kann, ist eine Überprüfung addieren Sie externe Service in Ihrem ersten Element zu initialisieren:
class DatastoreCallingDoFn extends DoFn {
private ExtServiceHandle handle = null;
private ExtServiceHandle initializeConnection() {
// ...
}
public void processElement(ProcessContext c) {
// ... process each element -- setup will have been called
if (handle == null) {
handle = initializeConnection();
}
// Process elements
}
}
Wenn Sie Strahl verwenden, können Sie die @Setup
verwenden Dekorator um eine Funktion in Ihrem DoFn
zu dekorieren, um die Einrichtung Ihres DoFn, wie die Initialisierung der Datenspeicherverbindung zu tun.
class DatastoreCallingDoFn extends DoFn {
@Setup
public void initializeDatastoreConnection() {
// ...
}
@ProcessElement
public void processElement(ProcessContext c) {
// ... process each element -- setup will have been called
}
}
Dies ist ähnlich dem answer in this question.
Seine Streaming-Pipeline, so Setup-Methode wird für jedes Element aufgerufen werden, und ich fand die 'Setup' oder' ProcessElement' Annotationen in Standard-Datenflussabhängigkeit. –
Es tut mir leid. '@ Setup' und' ProcessElement' sind Teil der Beam-Bibliothek. Ich bin verwirrt. Allerdings wird '@ Setup' nicht pro Element aufgerufen. Es heißt pro-DoFn-Instanz. Ich werde die Frage bearbeiten, um beide Optionen zu berücksichtigen. – Pablo