2017-09-19 4 views
2

Verwenden Sie die neueste Version von Apache Luftstrom. Begann mit LocalExecutor, in diesem Modus funktionierte alles gut, außer einigen Interaktionen der Web-UI-Status, dass der SelleryExecutor benötigt wurde, um sie zu verwenden. Installiert und konfiguriert den Sellery-Executor mit Redis, konfiguriert Redis als Broker-URL und das Ergebnis-Backend.Apache Airflow Sellerie Redis DecodeError

Es scheint zunächst zu arbeiten, bis eine Aufgabe, an welcher Stelle geplant ist, werden folgenden Fehler gibt:

File "/bin/airflow", line 28, in <module> 
    args.func(args) 
    File "/usr/lib/python2.7/site-packages/airflow/bin/cli.py", line 882, in scheduler 
    job.run() 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 201, in run 
    self._execute() 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1311, in _execute 
    self._execute_helper(processor_manager) 
    File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1444, in _execute_helper 
    self.executor.heartbeat() 
    File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", line 132, in heartbeat 
    self.sync() 
    File "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 91, in sync 
    state = async.state 
    File "/usr/lib/python2.7/site-packages/celery/result.py", line 436, in state 
    return self._get_task_meta()['status'] 
    File "/usr/lib/python2.7/site-packages/celery/result.py", line 375, in _get_task_meta 
    return self._maybe_set_cache(self.backend.get_task_meta(self.id)) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 352, in get_task_meta 
    meta = self._get_task_meta_for(task_id) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 668, in _get_task_meta_for 
    return self.decode_result(meta) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 271, in decode_result 
    return self.meta_from_decoded(self.decode(payload)) 
    File "/usr/lib/python2.7/site-packages/celery/backends/base.py", line 278, in decode 
    accept=self.accept) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads 
    return decode(data) 
    File "/usr/lib64/python2.7/contextlib.py", line 35, in __exit__ 
    self.gen.throw(type, value, traceback) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 54, in _reraise_errors 
    reraise(wrapper, wrapper(exc), sys.exc_info()[2]) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 50, in _reraise_errors 
    yield 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 263, in loads 
    return decode(data) 
    File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads 
    return load(BytesIO(s)) 
kombu.exceptions.DecodeError: invalid load key, '{'. 

Scheint eine Gurke Serialisierung Fehler zu sein, aber ich bin nicht sicher, wie das verfolgen Ursache. Irgendwelche Vorschläge?

Dieses Problem betrifft konsistent einen Workflow, bei dem ich die Subdag-Funktion verwende, möglicherweise hängt das Problem damit zusammen.

HINWEIS: Ich habe auch getestet mit RabbitMQ, hatte ein anderes Problem dort; Client zeigt "Verbindung zurückgesetzt durch Peer" und stürzt ab. Das RabbitMQ-Protokoll zeigt "Client unerwartet geschlossene TCP-Verbindung".

Antwort

0

ich auf diese stolperte nach dem exakt gleichen Backtrace in unseren Scheduler-Logs zu sehen:

File "/usr/lib/python2.7/site-packages/kombu/serialization.py", line 59, in pickle_loads 
    return load(BytesIO(s)) 
kombu.exceptions.DecodeError: invalid load key, '{'. 

Die Tatsache, dass Sellerie etwas zu unpickle versucht, die mit beginnt ‚{‘ schien verdächtig, so nahm ich einen tcpdump von der Verkehr und löste eine Aufgabe über das Web-UI. Die sich ergebende Erfassung enthalten diesen Austausch bei fast genau den gleichen Augenblick, dass die oben Backtrace in den Scheduler-Logs erschienen:

05:03:49.145849 IP <scheduler-ip-addr>.ec2.internal.45597 > <redis-ip-addr>.ec2.internal.6379: Flags [P.], seq 658:731, ack 46, win 211, options [nop,nop,TS val 654768546 ecr 4219564282], length 73: RESP "GET" "celery-task-meta-b0d3a29e-ac08-4e77-871e-b4d553502cc2" 
05:03:49.146086 IP <redis-ip-addr>.ec2.internal.6379 > <scheduler-ip-addr>.ec2.internal.45597: Flags [P.], seq 46:177, ack 731, win 210, options [nop,nop,TS val 4219564282 ecr 654768546], length 131: RESP "{"status": "SUCCESS", "traceback": null, "result": null, "task_id": "b0d3a29e-ac08-4e77-871e-b4d553502cc2", "children": []}" 

Die Nutzlast der Antwort von Redis ist klar JSON, also warum ist Sellerie versucht, es zu unpickle? Wir sind gerade dabei, von Airflow 1.7 auf 1.8 zu migrieren. Während unseres Rollouts haben wir eine Flotte von Airflow-Mitarbeitern mit v1.7 und eine weitere mit v1.8. Die Arbeiter sollten aus Schlangen mit disjunkten Arbeitsbelastungen ziehen, aber aufgrund eines Fehlers in einem unserer DAGs hatten wir eine TaskInstance, die von Airflow 1.8 geplant wurde und dann von einem Selleriearbeiter ausgeführt wurde, der über Airflow 1.7 gestartet wurde.

AIRFLOW-1038 den Serializer für Sellerie-Taskstatus von JSON (Standard) auf pickle geändert, so dass Mitarbeiter, die eine Version des Codes vor dieser Änderung ausführen, serialisieren die Ergebnisse in JSON und Scheduler eine Version des Codes, die dies enthalten change wird versuchen, die Ergebnisse durch erneutes Entpacken zu deserialisieren, was den obigen Fehler verursacht.

0

Überprüfen Sie, welche Art von sellery_result_backend Sie in der airflow.cfg konfiguriert haben. Versuchen Sie, es zu einem Datenbank-Backend (mysql etc) zu wechseln, wenn das nicht der Fall ist.

Wir sehen, dass mit einem Ampq-Backend (nur verfügbar auf Sellery 3.1 und darunter), Redis und RPC-Backend gibt es manchmal Probleme.

Verwandte Themen