2017-09-28 4 views
1

Ich arbeite an etwas, das Daten aus einer Warteschlange aufzeichnet. Es war einfach genug, die Warteschlange zu einem Observable zu verarbeiten, so dass mehrere Endpunkte in meinem Code die Informationen in der Warteschlange empfangen können.Muster für Observables mit Bestätigung

Darüber hinaus kann ich sicher sein, dass die Informationen in der Reihenfolge ankommt. Dieses Bit funktioniert auch gut, da die Observables dafür sorgen. Aber, ein schwieriges Bit ist, dass ich nicht möchte, dass der Beobachter über das nächste Ding benachrichtigt wird, bis es die Verarbeitung der vorherigen Sache abgeschlossen hat. Aber die Verarbeitung durch den Observer ist asynchron.

Als ein konkreteres Beispiel, das wahrscheinlich einfach genug zu folgen ist. Stellen Sie sich vor, meine Warteschlange enthält URLs. Ich zeige diese als Observable in meinem Code. Ich abonniere einen Observer, dessen Aufgabe es ist, die URLs zu holen und den Inhalt auf die Festplatte zu schreiben (dies ist ein künstliches Beispiel, also sollte man sich mit diesen Besonderheiten nicht befassen). Der wichtige Punkt ist, dass das Holen und Speichern asynchron ist. Mein Problem ist, dass ich nicht möchte, dass der Beobachter die "nächste" URL vom Observable erhält, bis er die vorherige Verarbeitung abgeschlossen hat.

Aber der Anruf an next auf der Observer-Schnittstelle gibt void zurück. Es gibt also keine Möglichkeit für den Observer, mir mitzuteilen, dass die asynchrone Aufgabe tatsächlich abgeschlossen wurde.

Irgendwelche Vorschläge? Ich vermute, dass es wahrscheinlich einen Operator gibt, der so programmiert werden könnte, dass er zukünftige Werte zurückhält (sie in den Speicher stellen?), Bis er irgendwie wusste, dass der Observer bereit dafür war. Aber ich hatte gehofft, dass so etwas schon existierte.

Antwort

1

ähnliche Anwendungsfall i in lief vor

window.document.onkeydown=(e)=>{ 
 
    return false 
 
} 
 
let count=0; 
 
let asyncTask=(name,time)=>{ 
 
    time=time || 2000 
 
    return Rx.Observable.create(function(obs) { 
 
     setTimeout(function() { 
 
     count++ 
 
     obs.next('task:'+name+count); 
 
      console.log('Task:',count ,' ', time, 'task complete') 
 
     obs.complete(); 
 
     }, time); 
 
    }); 
 
} 
 

 
let subject=new Rx.Subject() 
 
let queueExec$=new Rx.Subject() 
 

 

 
Rx.Observable.fromEvent(btnA, 'click').subscribe(()=>{ 
 
queueExec$.next(asyncTask('A',4000)) 
 
}) 
 

 
Rx.Observable.fromEvent(btnB, 'click').subscribe(()=>{ 
 
queueExec$.next(asyncTask('B',4000)) 
 
}) 
 

 
Rx.Observable.fromEvent(btnC, 'click').subscribe(()=>{ 
 
queueExec$.next(asyncTask('C',4000)) 
 
}) 
 

 
    queueExec$.concatMap(value=>value) 
 
    .subscribe(function(data) { 
 
     console.log('onNext', data); 
 
    }, 
 
    function(error) { 
 
     console.log('onError', error); 
 
    },function(){ 
 
console.log('completed') 
 
});

+0

Also, wenn ich das verstehe, haben Sie Ihre 'asyncTask' gibt ein Observable zurück, kein Wert. Dieses Observable veröffentlicht genau einen Wert und schließt dann. Aber wenn Sie dann 'concatMap' ausführen, um alle zusammen zu verketten, wird der Effekt darin bestehen, dass Sie die Werte vom nächsten Observable erst erhalten, wenn der erste * abgeschlossen * ist (was der Fall ist, wenn der Async abgeschlossen ist). Das ist eine interessante Idee. Ich habe etwas gefunden, das besser zu meinem Fall passt, aber das ist definitiv eine interessante Idee für andere Anwendungsfälle. –

+0

Mit diesem Muster können Sie alles an der asynchronen Warteschlange werfen, solange es beobachtbar ist. In einigen Fällen ist es flexibler –

1

Was Sie klingt wie "Gegendruck" beschreiben. Sie können darüber in der RxJS 4 Dokumentation https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/backpressure.md nachlesen. Dies bezieht sich jedoch auf Operatoren, die in RxJS 5 nicht existieren. Schauen Sie sich zum Beispiel "Controlled Observables" an, die sich auf das beziehen sollten, was Sie brauchen.

Ich glaube, Sie könnten das gleiche mit concatMap und eine Instanz von Subject erreichen:

const asyncOperationEnd = new Subject(); 

source.concatMap(val => asyncOperationEnd 
    .mapTo(void 0) 
    .startWith(val) 
    .take(2) // that's `val` and the `void 0` that ends this inner Observable 
) 
    .filter(Boolean) // Always ignore `void 0` 
    .subscribe(val => { 
    // do some async operation... 
    // call `asyncOperationEnd.next()` and let `concatMap` process another value 
    }); 

Fro Ihrer Beschreibung scheint es tatsächlich wie der „Beobachter“ Sie funktioniert wie Thema sind zu erwähnen, so dass es vielleicht mehr machen würde Sinn, eine benutzerdefinierte Subject-Klasse zu erstellen, die Sie in jeder Observable-Kette verwenden können.