2

Guten Morgen.Apache Airflow - Trigger/Zeitplan DAG erneut bei Abschluss (File Sensor)

Ich versuche, auch für das Einrichten einer DAG

  1. Uhr/Sinn für eine Datei
  2. Verfahren zur Herstellung eines Netzwerkordner der Datei
  3. Archiv der Datei

Mit dem Hit Tutorials online und stackoverflow Ich habe die folgenden DAG und Operator gefunden, die erfolgreich die Ziele erreichen, aber ich möchte, dass die DAG nach dem Abschluss neu geplant oder erneut ausgeführt wird, so dass sie startet Hing/Sensing für eine andere Datei.

Ich habe versucht, eine Variable max_active_runs:1 und dann eine schedule_interval: timedelta(seconds=5) setzen diese ja umterminiert die DAG aber beginnt Warteschlange Aufgabe und sperrt die Datei.

Irgendwelche Ideen willkommen, wie ich die DAG nach der archive_task erneut ausführen könnte?

Dank

DAG CODE

from airflow import DAG 
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator 
from datetime import datetime, timedelta 
from airflow.models import Variable 

default_args = { 
    'owner': 'glsam', 
    'depends_on_past': False, 
    'start_date': datetime.now(), 
    'provide_context': True, 
    'retries': 100, 
    'retry_delay': timedelta(seconds=30), 
    'max_active_runs': 1, 
    'schedule_interval': timedelta(seconds=5), 
} 

dag = DAG('test_sensing_for_a_file', default_args=default_args) 

filepath = Variable.get("soucePath_Test") 
filepattern = Variable.get("filePattern_Test") 
archivepath = Variable.get("archivePath_Test") 

sensor_task = OmegaFileSensor(
    task_id='file_sensor_task', 
    filepath=filepath, 
    filepattern=filepattern, 
    poke_interval=3, 
    dag=dag) 


def process_file(**context): 
    file_to_process = context['task_instance'].xcom_pull(
     key='file_name', task_ids='file_sensor_task') 
    file = open(filepath + file_to_process, 'w') 
    file.write('This is a test\n') 
    file.write('of processing the file') 
    file.close() 


proccess_task = PythonOperator(
    task_id='process_the_file', python_callable=process_file, dag=dag) 

archive_task = ArchiveFileOperator(
    task_id='archive_file', 
    filepath=filepath, 
    archivepath=archivepath, 
    dag=dag) 

sensor_task >> proccess_task >> archive_task 

FILE SENSOR OPERATOR

import os 
    import re 

    from datetime import datetime 
    from airflow.models import BaseOperator 
    from airflow.plugins_manager import AirflowPlugin 
    from airflow.utils.decorators import apply_defaults 
    from airflow.operators.sensors import BaseSensorOperator 


    class ArchiveFileOperator(BaseOperator): 
     @apply_defaults 
     def __init__(self, filepath, archivepath, *args, **kwargs): 
      super(ArchiveFileOperator, self).__init__(*args, **kwargs) 
      self.filepath = filepath 
      self.archivepath = archivepath 

     def execute(self, context): 
      file_name = context['task_instance'].xcom_pull(
       'file_sensor_task', key='file_name') 
      os.rename(self.filepath + file_name, self.archivepath + file_name) 


    class OmegaFileSensor(BaseSensorOperator): 
     @apply_defaults 
     def __init__(self, filepath, filepattern, *args, **kwargs): 
      super(OmegaFileSensor, self).__init__(*args, **kwargs) 
      self.filepath = filepath 
      self.filepattern = filepattern 

     def poke(self, context): 
      full_path = self.filepath 
      file_pattern = re.compile(self.filepattern) 

      directory = os.listdir(full_path) 

      for files in directory: 
       if not re.match(file_pattern, files): 
        return False 
       else: 
        context['task_instance'].xcom_push('file_name', files) 
        return True 


    class OmegaPlugin(AirflowPlugin): 
     name = "omega_plugin" 
     operators = [OmegaFileSensor, ArchiveFileOperator] 

Antwort

1

Set schedule_interval=None und airflow trigger_dag Befehl von BashOperator verwenden, um nächste Ausführung bei Beendigung der vorherigen zu starten.

trigger_next = BashOperator(task_id="trigger_next", 
      bash_command="airflow trigger_dag 'your_dag_id'", dag=dag) 

sensor_task >> proccess_task >> archive_task >> trigger_next 

Sie können Ihren ersten Lauf manuell mit dem gleichen airflow trigger_dag Befehl beginnen und dann trigger_next Aufgabe wird automatisch die nächste auslösen. Wir verwenden das seit vielen Monaten in der Produktion und es läuft perfekt.

+0

Eine große Methode, die Sie danken. –

4

Dmitris Methode hat perfekt funktioniert.

fand ich auch in meiner Lektüre schedule_interval=None Einstellung und dann die TriggerDagRunOperator mit arbeitete ebenso gut

trigger = TriggerDagRunOperator(
    task_id='trigger_dag_RBCPV99_rerun', 
    trigger_dag_id="RBCPV99_v2", 
    dag=dag) 

sensor_task >> proccess_task >> archive_task >> trigger 
Verwandte Themen