Es scheint, dass dies unterstützt wird, wenn Sie ein HQL-Skript mit $ {xxx} vars übergeben und es vorverarbeitet wird, um es in {{xxx}} zu konvertieren. Jinja-Stil vor der Bühne, der das Template-Rendering tatsächlich ausführt, um diese dann durch Werte aus einem vom Benutzer bereitgestellten Wörterbuch zu ersetzen. Ich glaube, das, weil es eine Funktion wie diese in der HiveOperator Klasse:
def prepare_template(self):
if self.hiveconf_jinja_translate:
self.hql = re.sub(
"(\$\{([ a-zA-Z0-9_]*)\})", "{{ \g<2> }}", self.hql)
if self.script_begin_tag and self.script_begin_tag in self.hql:
self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
Das Problem ist, ich kann nicht herausfinden, wie man dieses Stück Code auszulösen, bevor die Vorlage Rendering Stufe aufgerufen. Ich habe eine grundlegende DAG Skript wie folgt:
from airflow import DAG
from airflow.operators.hive_operator import HiveOperator
from datetime import datetime, timedelta
default_args = dict(
owner='mpetronic',
depends_on_past=False,
start_date=datetime(2017, 5, 2),
verbose=True,
retries=1,
retry_delay=timedelta(minutes=5)
)
dag = DAG(
dag_id='report',
schedule_interval='* * * * *',
user_defined_macros=dict(a=1, b=2),
default_args=default_args)
hql = open('/home/mpetronic/repos/airflow/resources/hql/report.hql').read()
task = HiveOperator(
task_id='report_builder',
hive_cli_conn_id='hive_dv',
schema='default',
mapred_job_name='report_builder',
hiveconf_jinja_translate=True,
dag=dag,
hql=hql)
kann ich sehen, dass mein user_defined_macros Wörterbuch es auf den Code macht, wo es mit einem globalen jinja Kontext-Wörterbuch verschmolzen wird, die dann zu meinem HQL Skript angewandt wird, machen als eine Vorlage. Da meine HQL native HQL ist, haben alle meine Variablen, die ich aktualisieren möchte, die Form $ {xxx} und jinja überspringt sie einfach. Ich brauche zuerst einen Luftstrom, um prepare_template() aufzurufen, aber ich sehe einfach nicht, wie ich das erreichen kann.
Ich weiß, ich konnte nur manuell meine HQL $ {xxx} in {{xxx}} ändern, aber das scheint ein Anti-Pattern zu sein. Ich möchte, dass das Skript nativ oder über den Luftstrom funktioniert. Dies ist die Funktion, in der TaskInstance Klasse, die meine manuell geändert rendert {{xxx}} Werte:
def render_templates(self):
task = self.task
jinja_context = self.get_template_context()
if hasattr(self, 'task') and hasattr(self.task, 'dag'):
if self.task.dag.user_defined_macros:
jinja_context.update(
self.task.dag.user_defined_macros)
rt = self.task.render_template # shortcut to method
for attr in task.__class__.template_fields:
content = getattr(task, attr)
if content:
rendered_content = rt(attr, content, jinja_context)
setattr(task, attr, rendered_content)
In der Lage, das oben genannte Arbeiten in Luftstrom 1.8 zu erhalten, werden die $ {xxx} Formvariablen in der hql Datei durch entsprechende Einträge in den user_defined_macros ersetzt. Haben Sie vielleicht versucht, einen Logger in der höheren Stufe des Codes hinzuzufügen? In meinem Fall bin ich in der Lage, die Invokation mit entsprechendem Ersatz zu sehen. –