2016-11-22 4 views
1

Wir haben derzeit eine Python Apache Beam-Pipeline arbeiten und in der Lage, lokal ausgeführt werden. Wir sind gerade dabei, die Pipeline auf Google Cloud Dataflow laufen zu lassen und vollständig automatisiert zu sein, haben aber eine Einschränkung in der Pipeline-Überwachung von Dataflow/Apache Beam gefunden.Python Apache Beam Pipeline Status API Anruf

Gegenwärtig hat Cloud Dataflow zwei Möglichkeiten, den Status Ihrer Pipeline zu überwachen, entweder über ihre Benutzeroberfläche oder über gcloud in der Befehlszeile. Beide Lösungen eignen sich nicht für eine vollautomatisierte Lösung, bei der eine verlustfreie Dateiverarbeitung berücksichtigt werden kann.

Blick auf Apache Beam Github sie haben eine Datei, internal/apiclient.py, die es zeigt eine Funktion zu erhalten, den Status eines Auftrags, get_job verwendet wird.

Die eine Instanz, die wir get_job verwendet haben, ist in runners/dataflow_runner.py.

Das Endziel besteht darin, diese API zu verwenden, um den Status eines Jobs oder mehrerer Jobs zu erhalten, die automatisch ausgelöst werden, um sicherzustellen, dass sie alle über die Pipeline erfolgreich verarbeitet werden.

Kann uns jemand erklären, wie diese API nach der Ausführung unserer Pipeline (p.run()) verwendet werden kann? Wir verstehen nicht, woher runner in response = runner.dataflow_client.get_job(job_id) stammt.

Wenn jemand ein größeres Verständnis davon geben könnte, wie wir auf diesen API-Aufruf zugreifen können, während wir unsere Pipeline einrichten/ausführen, wäre das großartig!

Antwort

1

Ich landete nur herumspielen mit dem Code und fand, wie man die Jobdetails erhält. Unser nächster Schritt ist zu sehen, ob es eine Möglichkeit gibt, eine Liste aller Jobs zu erhalten.

# start the pipeline process 
pipeline     = p.run() 
# get the job_id for the current pipeline and store it somewhere 
job_id     = pipeline.job_id() 
# setup a job_version variable (either batch or streaming) 
job_version    = dataflow_runner.DataflowPipelineRunner.BATCH_ENVIRONMENT_MAJOR_VERSION 
# setup "runner" which is just a dictionary, I call it local 
local     = {} 
# create a dataflow_client 
local['dataflow_client'] = apiclient.DataflowApplicationClient(pipeline_options, job_version) 
# get the job details from the dataflow_client 
print local['dataflow_client'].get_job(job_id) 
Verwandte Themen