Ich versuche asynchrone Operationen zu verstehen, die mit NDB eingeführt wurden, ich möchte @ndb.tasklet
verwenden, um einige meiner Arbeiten zu synchronisieren.Wie man eine asynchrone NDB-Methode überschreibt und ein eigenes Tasklet schreibt
Das einfache Beispiel string_id Generation in den überschriebenen get_or_insert_async
würde Ist das ein richtiger Weg, um die Dinge? Was kann hier verbessert werden?
@classmethod
@ndb.tasklet
def get_or_insert_async(cls, *args):
id = cls.make_string_id(*args)
model = yield super(MyModel, cls).get_or_insert_async(id)
raise ndb.Return(model)
Ein anderes Beispiel wäre, Dinge in einer Schleife in Fan-out irgendwie zu tun. Ist das richtig?
@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):
@ndb.tasklet
def internal_tasklet(data):
do_some_long_taking_stuff(data)
id = make_stuff_needed_for_id(data)
model = yield cls.get_or_insert_async(id)
model.long_processing(data)
yield model.put_async()
raise ndb.Return(None)
for data in some_collection:
# will it parallelise internal_tasklet execution?
yield internal_tasklet(data)
raise ndb.Return(None)
EDIT:
Wie das gesamte Konzept zu verstehen, sind yields
hier ein Future
Objekte zu schaffen, die dann parallel gesammelt werden (wenn möglich) und asynchron ausgeführt. Hab ich recht?
Nach Nicks Hinweis (ist es, was Sie gemeint?):
@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):
@ndb.tasklet
def internal_tasklet(data):
do_some_long_taking_stuff(data)
id = make_stuff_needed_for_id(data)
model = yield cls.get_or_insert_async(id)
model.long_processing(data)
raise ndb.Return(model) # change here
models = []
for data in some_collection:
# will it parallelise internal_tasklet execution?
m = yield internal_tasklet(data) # change here
models.appedn(m) # change here
keys = yield ndb.put_multi_async(models) # change here
raise ndb.Return(keys) # change here
EDIT:
Neue überarbeitete Version ...
@classmethod
@ndb.tasklet
def do_stuff(cls, some_collection):
@ndb.tasklet
def internal_tasklet(data):
do_some_long_taking_stuff(data)
id = make_stuff_needed_for_id(data)
model = yield cls.get_or_insert_async(id)
model.long_processing(data)
raise ndb.Return(model)
futures = []
for data in some_collection:
# tasklets won't run in parallel but while
# one is waiting on a yield (and RPC underneath)
# the other will advance it's execution
# up to a next yield or return
fut = internal_tasklet(data)) # change here
futures.append(fut) # change here
Future.wait_all(futures) # change here
models = [fut.get_result() for fut in futures]
keys = yield ndb.put_multi_async(models) # change here
raise ndb.Return(keys) # change here
Wäre es nicht 'make_string_id' Aufruf in' get_or_insert_async' synchronen und nur der zugrunde liegende Aufruf an die ursprüngliche 'get_or_insert_async' wirklich asynchron? –
Können Sie das zweite Beispiel in Ihrer Antwort umschreiben? Ich bin mir nicht sicher, ob Hexen zu fallen drohen, also würde die Schleife nicht auf jedes Element warten und alle (oder die meisten) 'internen_Tasklet'-Ausführungen würden parallelisiert. –
10 @WooYek Ja, aber make_string_id ist in Ihrem Snippet trotzdem synchron. Und wenn sie keine RPCs erstellen, ist es sinnlos, sie asynchron zu machen - Tasklets helfen nur, wenn RPCs Zeit benötigen. Bezüglich des zweiten Beispiels - das Problem ist das Anti-Pattern des Aufrufs von Ertrag in einer Schleife. Dies bedeutet, dass jede einzelne Aufgabe beendet wird, bevor sie zur nächsten übergeht. Jedes Mal, wenn Sie eine Schleife mit Ertrag haben, sollten Sie die Funktion ohne Ausbeute aufrufen, eine Liste der Ergebnisse zusammenstellen und den Ertrag auf dieser Liste aufrufen. –