2017-08-09 5 views
3

Ich bin neu in der reaktiven Programmierung und habe Schwierigkeiten mit "alles kann ein Strom sein" Mantra. Ich betrachte das folgende Szenario - Ich habe einen Strom von websocket Ereignisse wie diese definiertem:Wie storniere ich ein Ereignis im reaktiven Strom?

Rx.Observable.create((observer) => { 

    io.on('connect', function(socket){ 

     socket.on("enroll", function(player) { 
     observer.next({ 
      event: 'enroll', 
      player, 
      socket 
     }); 
     }); 

     socket.on('resign', function(player){ 
     observer.next({ 
      event: 'resign', 
      player, 
      socket 
     }); 
     }); 

    }); 

    return { 
     dispose: io.close 
    }; 
    }); 

Dann kann ich etwas tun, wie

enrollmentStream = events$ 
     .filter(find({ event: "enroll" })) 
     .map(pick('player')); 

Und ebenfalls

resignationStream = events$ 
     .filter(find({ event: "resign" })) 
     .map(pick('player')); 

Ich möchte registrierte Spieler in einem Stream sammeln, der sie in Vierergruppen zusammenfasst, aber das sollte natürlich nur für Benutzer geschehen, die sich in der Registrierung befinden streamen aber nicht in resignationStream oder zumindest war das letzte Event Anmeldung. Wie mache ich das?

Hier ist das Marmor Diagramm.

Marble diagram

Es gibt 5 Spieler, die sich einschreiben. Das Spiel beginnt, wenn 4 Spieler angemeldet sind. Beachten Sie, dass der zweite Spieler (violett) sich einschreibt, aber dann zurücktritt, so dass das Spiel nicht mit blauem Marmor beginnt, sondern mit dem nächsten - gelben - Grund, erst danach sind wirklich 4 Spieler bereit.

Wahrscheinlich sollte es einige Stream-Betrieb geben wie "ohne" ... ist da?

+2

Ich bin mir nicht sicher, ob ich Ihrer Anforderung folge. Könnten Sie ein Marmor- oder Sequenzdiagramm der Reihe von Aktionen hinzufügen, die Sie verarbeiten müssten? – paulpdaniels

+0

Bitte werfen Sie einen Blick auf die aktualisierte Frage. – kboom

Antwort

2

denke ich in diesem Szenario Sie combineLatest() und scan() Operatoren verwenden könnte und dann eine Liste von unresigned Spieler machen sich selbst:

const bufferedEnrollment = enrollmentStream.scan((acc, val) => { acc.push(val); return acc; }, []); 
const bufferedResignation = enrollmentStream.scan((acc, val) => { acc.push(val); return acc; }, []); 

Observable.combineLatest(bufferedEnrollment, bufferedResignation) 
    .map(values => { 
    const enrolled = values[0]; 
    const resigned = values[1]; 

    // remove resigned players from `enrolled` array 
    return enrolled; 
    }) 
    .filter(players => players.length === 4) 
    .subscribe(...) 

Der scan() Operator nur verwendet wird, die Spieler in ein Array zu sammeln. Wenn Sie beispielsweise das Array zurücksetzen möchten, können Sie es mit einem anderen Observable zusammenführen.

enrollmentStream 
    .merge(resetStream) 
    .scan((acc, val) => { 
    if (!val) { 
     return []; 
    } 
    acc.push(val); 
    return acc; 
    }, []); 

(aus offensichtlichen Gründen habe ich diesen Code nicht getestet).

Verwandte Themen