2016-11-16 1 views
0

Ich habe kürzlich meine Sellerie-Installation auf 4.0 aktualisiert. Nach einigen Tagen des Ringens mit dem Upgrade-Prozess, habe ich es endlich zur Arbeit gebracht ... irgendwie. Einige Aufgaben werden zurückgegeben, die letzte Aufgabe jedoch nicht.Redis gibt kein Ergebnis zurück, nachdem Sellerie von 3.1 auf 4.0 aktualisiert wurde

Ich habe eine Klasse, SFF, die in nimmt und analysiert eine Datei:

# Constructor with I/O file 
def __init__(self, file): 

    # File data that's gonna get used a lot 
    sffDescriptor = file.fileno() 
    fileName = abspath(file.name) 

    # Get the pointer to the file 
    filePtr = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ) 

    # Get the header info 
    hdr = filePtr.read(HEADER_SIZE) 
    self.header = SFFHeader._make(unpack(HEADER_FMT, hdr)) 

    # Read in the palette maps 
    print self.header.onDemandDataSize 
    print self.header.onLoadDataSize 
    palMapsResult = getPalettes.delay(fileName, self.header.palBankOff - HEADER_SIZE, self.header.onDemandDataSize, self.header.numPals) 

    # Read the sprite list nodes 
    nodesStart = self.header.sprListOff 
    nodesEnd = self.header.palBankOff 
    print nodesEnd - nodesStart 
    sprNodesResult = getSprNodes.delay(fileName, nodesStart, nodesEnd, self.header.numSprites) 

    # Get palette data 
    self.palettes = palMapsResult.get() 

    # Get sprite data 
    spriteNodes = sprNodesResult.get() 

    # TESTING 
    spritesResultSet = ResultSet([]) 
    numSpriteNodes = len(spriteNodes) 
    # Split the nodes into chunks of size 32 elements 
    for x in xrange(0, numSpriteNodes, 32): 
     spritesResult = getSprites.delay(spriteNodes, x, x+32, fileName, self.palettes, self.header.palBankOff, self.header.onDemandDataSizeTotal) 
     spritesResultSet.add(spritesResult) 
     break # REMEMBER TO REMOVE FOR ENTIRE SFF 

    self.sprites = spritesResultSet.join_native() 

Es spielt keine Rolle, ob es sich um eine einzige Aufgabe ist, die die gesamte spritesResult zurückkehrt, oder wenn ich es ein ResultSet mit gespalten Das Ergebnis ist immer das gleiche: Die Python-Konsole, die ich gerade verwende, hängt entweder an oder spritesResult.get() (je nachdem, wie ich es formatiere). Hier

ist die Aufgabe in Frage:

@task 
def getSprites(nodes, start, end, fileName, palettes, palBankOff, onDemandDataSizeTotal): 
sprites = [] 

with open(fileName, "rb") as file: 
    sffDescriptor = file.fileno() 
    sffData = mmap.mmap(sffDescriptor, 0, flags=mmap.MAP_SHARED, prot=mmap.PROT_READ) 

    for node in nodes[start:end]: 
     sprListNode = dict(SprListNode._make(node)._asdict()) # Need to convert it to a dict since values may change. 
     #print node 
     #print sprListNode 

     # If it's a linked sprite, the data length is 0, so get the linked index. 
     if sprListNode['dataLen'] == 0: 
      sprListNodeTemp = SprListNode._make(nodes[sprListNode['index']]) 
      sprListNode['dataLen'] = sprListNodeTemp.dataLen 
      sprListNode['dataOffset'] = sprListNodeTemp.dataOffset 
      sprListNode['compression'] = sprListNodeTemp.compression 

     # What does the offset need to be? 
     dataOffset = sprListNode['dataOffset'] 
     if sprListNode['loadMode'] == 0: 
      dataOffset += palBankOff #- HEADER_SIZE 
     elif sprListNode['loadMode'] == 1: 
      dataOffset += onDemandDataSizeTotal #- HEADER_SIZE 

     #print sprListNode 

     # Seek to the data location and "read" it in. First 4 bytes are just the image length 
     start = dataOffset + 4 
     end = dataOffset + sprListNode['dataLen'] 
     #sffData.seek(start) 

     compressedSprite = sffData[start:end] 

     # Create the sprite 
     sprite = Sprite(sprListNode, palettes[sprListNode['palNo']], np.fromstring(compressedSprite, dtype=np.uint8)) 
     sprites.append(sprite) 

return json.dumps(sprites, cls=SpriteJSONEncoder) 

Ich weiß, dass es die Anweisung return erreicht, denn wenn wir einen Druck direkt darüber setzen, es im Sellerie Fenster gedruckt wird. Ich weiß auch, dass die Aufgabe bis zum Abschluss ausgeführt wird, weil ich die folgende Meldung vom Arbeiter erhalten:

[2016.11.16 00: 03: 33.639: INFO/PoolWorker-4] Aufgabe framedatabase.tasks.getSprites [285ac9b1-09b4-4cf1-a251-da6212863832] in 0.137236133218s erfolgreich: '[{"width": 120, "palNo": 30, "group": 9000, "xAxis": 0, "yAxis": 0, " Daten ": ...‘

Hier sind meine Sellerie Einstellungen in settings.py:

# Celery settings 
BROKER_URL='redis://localhost:1717/1' 
CELERY_RESULT_BACKEND='redis://localhost:1717/0' 
CELERY_IGNORE_RESULT=False 
CELERY_IMPORTS = ("framedatabase.tasks",) 

... und meine celery.py:

from __future__ import absolute_import 

import os 

from celery import Celery 

# set the default Django settings module for the 'celery' program. 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'framedatabase.settings') 

from django.conf import settings # noqa 

app = Celery('framedatabase', backend='redis://localhost:1717/1', broker="redis://localhost:1717/0", 
    include=['framedatabase.tasks']) 

# Using a string here means the worker will not have to 
# pickle the object when using Windows. 
app.config_from_object('django.conf:settings', namespace='CELERY') 
app.autodiscover_tasks() 


@app.task(bind=True) 
def debug_task(self): 
    print('Request: {0!r}'.format(self.request)) 

Antwort

0

Das Problem gefunden. Offenbar führte sie in eine Sackgasse wie im Abschnitt erwähnten Dokumentation im Sellerie „synchronous Unteraufgaben Vermeiden Start“ hier: http://docs.celeryproject.org/en/latest/userguide/tasks.html#tips-and-best-practices

So habe ich von der Linie los:

sprNodesResult.get() 

und verändert das Endergebnis ein Kette:

self.sprites = chain(getSprNodes.s(fileName, nodesStart, nodesEnd, self.header.numSprites), 
    getSprites.s(0,32,fileName,self.palettes,self.header.palBankOff,self.header.onDemandDataSizeTotal))().get() 

Und es funktioniert! Jetzt muss ich nur noch einen Weg finden, das zu teilen, wie ich es möchte!

Verwandte Themen