2017-02-27 19 views
0

Python Anruf ansibleApi mit Sellerie Rückkehr Keine, ich habe ein paar Tage gesucht.Es funktioniert gut mit Call-Deploy-Funktion ohne Sellerie, aber mit Sellerie meine Code-Anruf AnsibleApi zurück keine.django + Sellerie + ansibleApi zurück Keine

Schritte reproduzieren.

1.tasks.py

from celery import shared_task 
from .deploy_tomcat2 import django_process 


@shared_task 
def deploy(jira_num): 
    #return 'hello world {0}'.format(jira_num) 
    #rdb.set_trace() 
    return django_process(jira_num)  

2.deploy_tomcat2.py

from .AnsibleApi import CallApi 

def django_process(jira_num): 
    server = '10.10.10.30' 
    name = 'abc' 
    port = 11011 
    code = 'efs' 
    jdk = '1.12.13' 
    jvm = 'xxxx' 

    if str.isdigit(jira_num): 
     # import pdb 
     # pdb.set_trace() 
     call = CallApi(server,name,port,code,jdk,jvm) 
     return call.run_task() 

3.AnsibleApi.py

#!/usr/bin/env python 

import logging 
from .Logger import Logger 
from django.conf import settings 
from collections import namedtuple 
from ansible.parsing.dataloader import DataLoader 
from ansible.vars import VariableManager 
from ansible.inventory import Inventory 
from ansible.playbook.play import Play 
from ansible.executor.task_queue_manager import TaskQueueManager 
from ansible.plugins.callback import CallbackBase 

Log = Logger('/tmp/auto_deploy_tomcat.log',logging.INFO) 


class ResultCallback(CallbackBase): 
    def __init__(self, *args, **kwargs): 
     super(ResultCallback ,self).__init__(*args, **kwargs) 
     self.host_ok = {} 
     self.host_unreachable = {} 
     self.host_failed = {} 

    def v2_runner_on_unreachable(self, result): 
     self.host_unreachable[result._host.get_name()] = result 

    def v2_runner_on_ok(self, result, *args, **kwargs): 
     self.host_ok[result._host.get_name()] = result 

    def v2_runner_on_failed(self, result, *args, **kwargs): 
     self.host_failed[result._host.get_name()] = result 


class CallApi(object): 
    user = settings.SSH_USER 
    ssh_private_key_file = settings.SSH_PRIVATE_KEY_FILE 
    results_callback = ResultCallback() 
    Options = namedtuple('Options', 
         ['connection', 'module_path', 'private_key_file', 'forks', 'become', 'become_method', 
          'become_user', 'check']) 

    def __init__(self,ip,name,port,code,jdk,jvm): 
     self.ip = ip 
     self.name = name 
     self.port = port 
     self.code = code 
     self.jdk = jdk 
     self.jvm = jvm 
     self.results_callback = ResultCallback() 
     self.results_raw = {} 

    def _gen_user_task(self): 
     tasks = [] 
     deploy_script = 'autodeploy/tomcat_deploy.sh' 
     dst_script = '/tmp/tomcat_deploy.sh' 
     cargs = dict(src=deploy_script, dest=dst_script, owner=self.user, group=self.user, mode='0755') 
     args = "%s %s %d %s %s '%s'" % (dst_script, self.name, self.port, self.code, self.jdk, self.jvm) 
     tasks.append(dict(action=dict(module='copy', args=cargs),register='shell_out')) 
     tasks.append(dict(action=dict(module='debug', args=dict(msg='{{shell_out}}')))) 
     # tasks.append(dict(action=dict(module='command', args=args))) 
     # tasks.append(dict(action=dict(module='command', args=args), register='result')) 
     # tasks.append(dict(action=dict(module='debug', args=dict(msg='{{result.stdout}}')))) 
     self.tasks = tasks 

    def _set_option(self): 
     self._gen_user_task() 

     self.variable_manager = VariableManager() 
     self.loader = DataLoader() 
     self.options = self.Options(connection='smart', module_path=None, private_key_file=self.ssh_private_key_file, forks=None, 
            become=True, become_method='sudo', become_user='root', check=False) 
     self.inventory = Inventory(loader=self.loader, variable_manager=self.variable_manager, host_list=[self.ip]) 
     self.variable_manager.set_inventory(self.inventory) 

     play_source = dict(
     name = "auto deploy tomcat", 
      hosts = self.ip, 
      remote_user = self.user, 
      gather_facts='no', 
      tasks = self.tasks 
     ) 
     self.play = Play().load(play_source, variable_manager=self.variable_manager, loader=self.loader) 

    def run_task(self): 
     self.results_raw = {'success':{}, 'failed':{}, 'unreachable':{}} 
     tqm = None 
     from celery.contrib import rdb;rdb.set_trace() 
     #import pdb;pdb.set_trace() 
     self._set_option() 
     try: 
      tqm = TaskQueueManager(
       inventory=self.inventory, 
       variable_manager=self.variable_manager, 
       loader=self.loader, 
       options=self.options, 
       passwords=None, 
       stdout_callback=self.results_callback, 
      ) 
      result = tqm.run(self.play) 
     finally: 
      if tqm is not None: 
       tqm.cleanup() 

     for host, result in self.results_callback.host_ok.items(): 
      self.results_raw['success'][host] = result._result 

     for host, result in self.results_callback.host_failed.items(): 
      self.results_raw['failed'][host] = result._result 

     for host, result in self.results_callback.host_unreachable.items(): 
      self.results_raw['unreachable'][host]= result._result 
     Log.info("result is :%s" % self.results_raw) 
     return self.results_raw 

4.celery Arbeiter

celery -A jira worker -Q queue.ops.deploy -n "deploy.%h" -l info 

5.produce msg:

deploy.apply_async(args=['150'], queue='queue.ops.deploy', routing_key='ops.deploy') 

Antwort

0

Es scheint OK.
Die einzige Frage ist None ist wirklich die deploy Aufgabenrückgabe?
Es wird besser sein, wenn Sie Ihre Sellerie Arbeiter Protokoll veröffentlichen können.

+0

'[2017.02.27 16: 46: 08.554: INFO/MainProcess] Empfangene Aufgabe: autodeploy.tasks.deploy [a963d1f3-cc1b-48da-9701-28297f7b68a5]' ' [2017.02.27 16: 46: 08,786: INFO/PoolWorker-2] Ergebnis ist: {'Erfolg': {}, 'fehlgeschlagen': {}, 'unerreichbar': {}} ' ' [2017-02-27 16:46: 08.808: INFO/PoolWorker-2] Aufgabe autodeploy.tasks.deploy [a963d1f3-cc1b-48da-9701-28297f7b68a5] Erfolg in 0.18285173299955204s: {'Erfolg': {}, 'fehlgeschlagen': {}, 'unerreichbar': {}} ' –

+0

看来 你 找到 了 问题' Tasks dürfen Subprozesse nicht starten', 不过 不 好像 不 建议Htt 这个 限制, 你 可以 参考 一下 [这个 讨论] (https://github.com/sellery/sellery/issues/1709) 好像 要 自己 控制 (1, 换成 线程; 2.daemon = Falsch; 其他 没 仔细看 :)). 'TaskQueueManager' 没 用过, 没法 帮 你 ... – Cheney

+0

没有 技能 还 没有 bekommen, 晚点 再 研究 一下. –

0

gibt es zwei Verfahren, um dieses Problem zu lösen, deaktivieren assert: 1. Wo Sellerie beginnt gesetzt export PYTHONOPTIMIZE = 1 oder Start Sellerie mit diesem Parameter -O OPTIMIZATION 2.disable Python Paket Multiprozessing process.py Leitung 102:

assert not _current_process._config.get('daemon'), \ 
       'daemonic processes are not allowed to have children' 
Verwandte Themen