Guten Morgen.Apache Airflow - Trigger/Zeitplan DAG erneut bei Abschluss (File Sensor)
Ich versuche, auch für das Einrichten einer DAG
- Uhr/Sinn für eine Datei
- Verfahren zur Herstellung eines Netzwerkordner der Datei
- Archiv der Datei
Mit dem Hit Tutorials online und stackoverflow Ich habe die folgenden DAG und Operator gefunden, die erfolgreich die Ziele erreichen, aber ich möchte, dass die DAG nach dem Abschluss neu geplant oder erneut ausgeführt wird, so dass sie startet Hing/Sensing für eine andere Datei.
Ich habe versucht, eine Variable max_active_runs:1
und dann eine schedule_interval: timedelta(seconds=5)
setzen diese ja umterminiert die DAG aber beginnt Warteschlange Aufgabe und sperrt die Datei.
Irgendwelche Ideen willkommen, wie ich die DAG nach der archive_task erneut ausführen könnte?
Dank
DAG CODE
from airflow import DAG
from airflow.operators import PythonOperator, OmegaFileSensor, ArchiveFileOperator
from datetime import datetime, timedelta
from airflow.models import Variable
default_args = {
'owner': 'glsam',
'depends_on_past': False,
'start_date': datetime.now(),
'provide_context': True,
'retries': 100,
'retry_delay': timedelta(seconds=30),
'max_active_runs': 1,
'schedule_interval': timedelta(seconds=5),
}
dag = DAG('test_sensing_for_a_file', default_args=default_args)
filepath = Variable.get("soucePath_Test")
filepattern = Variable.get("filePattern_Test")
archivepath = Variable.get("archivePath_Test")
sensor_task = OmegaFileSensor(
task_id='file_sensor_task',
filepath=filepath,
filepattern=filepattern,
poke_interval=3,
dag=dag)
def process_file(**context):
file_to_process = context['task_instance'].xcom_pull(
key='file_name', task_ids='file_sensor_task')
file = open(filepath + file_to_process, 'w')
file.write('This is a test\n')
file.write('of processing the file')
file.close()
proccess_task = PythonOperator(
task_id='process_the_file', python_callable=process_file, dag=dag)
archive_task = ArchiveFileOperator(
task_id='archive_file',
filepath=filepath,
archivepath=archivepath,
dag=dag)
sensor_task >> proccess_task >> archive_task
FILE SENSOR OPERATOR
import os
import re
from datetime import datetime
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from airflow.operators.sensors import BaseSensorOperator
class ArchiveFileOperator(BaseOperator):
@apply_defaults
def __init__(self, filepath, archivepath, *args, **kwargs):
super(ArchiveFileOperator, self).__init__(*args, **kwargs)
self.filepath = filepath
self.archivepath = archivepath
def execute(self, context):
file_name = context['task_instance'].xcom_pull(
'file_sensor_task', key='file_name')
os.rename(self.filepath + file_name, self.archivepath + file_name)
class OmegaFileSensor(BaseSensorOperator):
@apply_defaults
def __init__(self, filepath, filepattern, *args, **kwargs):
super(OmegaFileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.filepattern = filepattern
def poke(self, context):
full_path = self.filepath
file_pattern = re.compile(self.filepattern)
directory = os.listdir(full_path)
for files in directory:
if not re.match(file_pattern, files):
return False
else:
context['task_instance'].xcom_push('file_name', files)
return True
class OmegaPlugin(AirflowPlugin):
name = "omega_plugin"
operators = [OmegaFileSensor, ArchiveFileOperator]
Eine große Methode, die Sie danken. –